On 12/22/20 12:04 PM, Tang, Haiying wrote:
Hi Andrey,

There is an error report in your patch as follows. Please take a check.

https://travis-ci.org/github/postgresql-cfbot/postgresql/jobs/750682857#L1519

copyfrom.c:374:21: error: ‘save_cur_lineno’ is used uninitialized in this 
function [-Werror=uninitialized]

Regards,
Tang



Thank you,
see new version in attachment.

--
regards,
Andrey Lepikhov
Postgres Professional
>From e2bc0980f05061afe199de63b76b00020208510a Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Mon, 14 Dec 2020 13:37:40 +0500
Subject: [PATCH 2/2] Fast COPY FROM into the foreign or sharded table.

This feature enables bulk COPY into foreign table in the case of
multi inserts is possible and foreign table has non-zero number of columns.

FDWAPI was extended by next routines:
* BeginForeignCopy
* EndForeignCopy
* ExecForeignCopy

BeginForeignCopy and EndForeignCopy initialize and free
the CopyState of bulk COPY. The ExecForeignCopy routine send
'COPY ... FROM STDIN' command to the foreign server, in iterative
manner send tuples by CopyTo() machinery, send EOF to this connection.

Code that constructed list of columns for a given foreign relation
in the deparseAnalyzeSql() routine is separated to the deparseRelColumnList().
It is reused in the deparseCopyFromSql().

Added TAP-tests on the specific corner cases of COPY FROM STDIN operation.

By the analogy of CopyFrom() the CopyState structure was extended
with data_dest_cb callback. It is used for send text representation
of a tuple to a custom destination.
The PgFdwModifyState structure is extended with the cstate field.
It is needed for avoid repeated initialization of CopyState. ALso for this
reason CopyTo() routine was split into the set of routines CopyToStart()/
CopyTo()/CopyToFinish().

Enum CopyInsertMethod removed. This logic implements by ri_usesMultiInsert
field of the ResultRelInfo sructure.

Discussion:
https://www.postgresql.org/message-id/flat/3d0909dc-3691-a576-208a-90986e55489f%40postgrespro.ru

Authors: Andrey Lepikhov, Ashutosh Bapat, Amit Langote
---
 contrib/postgres_fdw/deparse.c                |  60 ++++++--
 .../postgres_fdw/expected/postgres_fdw.out    |  46 +++++-
 contrib/postgres_fdw/postgres_fdw.c           | 137 ++++++++++++++++++
 contrib/postgres_fdw/postgres_fdw.h           |   1 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  45 ++++++
 doc/src/sgml/fdwhandler.sgml                  |  73 ++++++++++
 src/backend/commands/copy.c                   |   4 +-
 src/backend/commands/copyfrom.c               | 126 +++++++++-------
 src/backend/commands/copyto.c                 |  84 ++++++++---
 src/backend/executor/execMain.c               |   8 +-
 src/backend/executor/execPartition.c          |  27 +++-
 src/include/commands/copy.h                   |   8 +-
 src/include/foreign/fdwapi.h                  |  15 ++
 13 files changed, 540 insertions(+), 94 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index ca2f9f3215..b2a71faabc 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -184,6 +184,8 @@ static void appendAggOrderBy(List *orderList, List *targetList,
 static void appendFunctionName(Oid funcid, deparse_expr_cxt *context);
 static Node *deparseSortGroupClause(Index ref, List *tlist, bool force_colno,
 									deparse_expr_cxt *context);
+static List *deparseRelColumnList(StringInfo buf, Relation rel,
+								  bool enclose_in_parens);
 
 /*
  * Helper functions
@@ -1763,6 +1765,20 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 						 withCheckOptionList, returningList, retrieved_attrs);
 }
 
+/*
+ * Deparse COPY FROM into given buf.
+ * We need to use list of parameters at each query.
+ */
+void
+deparseCopyFromSql(StringInfo buf, Relation rel)
+{
+	appendStringInfoString(buf, "COPY ");
+	deparseRelation(buf, rel);
+	(void) deparseRelColumnList(buf, rel, true);
+
+	appendStringInfoString(buf, " FROM STDIN ");
+}
+
 /*
  * deparse remote UPDATE statement
  *
@@ -2066,6 +2082,30 @@ deparseAnalyzeSizeSql(StringInfo buf, Relation rel)
  */
 void
 deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
+{
+	appendStringInfoString(buf, "SELECT ");
+	*retrieved_attrs = deparseRelColumnList(buf, rel, false);
+
+	/* Don't generate bad syntax for zero-column relation. */
+	if (list_length(*retrieved_attrs) == 0)
+		appendStringInfoString(buf, "NULL");
+
+	/*
+	 * Construct FROM clause
+	 */
+	appendStringInfoString(buf, " FROM ");
+	deparseRelation(buf, rel);
+}
+
+/*
+ * Construct the list of columns of given foreign relation in the order they
+ * appear in the tuple descriptor of the relation. Ignore any dropped columns.
+ * Use column names on the foreign server instead of local names.
+ *
+ * Optionally enclose the list in parantheses.
+ */
+static List *
+deparseRelColumnList(StringInfo buf, Relation rel, bool enclose_in_parens)
 {
 	Oid			relid = RelationGetRelid(rel);
 	TupleDesc	tupdesc = RelationGetDescr(rel);
@@ -2074,10 +2114,8 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
 	List	   *options;
 	ListCell   *lc;
 	bool		first = true;
+	List	   *retrieved_attrs = NIL;
 
-	*retrieved_attrs = NIL;
-
-	appendStringInfoString(buf, "SELECT ");
 	for (i = 0; i < tupdesc->natts; i++)
 	{
 		/* Ignore dropped columns. */
@@ -2086,6 +2124,9 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
 
 		if (!first)
 			appendStringInfoString(buf, ", ");
+		else if (enclose_in_parens)
+			appendStringInfoChar(buf, '(');
+
 		first = false;
 
 		/* Use attribute name or column_name option. */
@@ -2105,18 +2146,13 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
 
 		appendStringInfoString(buf, quote_identifier(colname));
 
-		*retrieved_attrs = lappend_int(*retrieved_attrs, i + 1);
+		retrieved_attrs = lappend_int(retrieved_attrs, i + 1);
 	}
 
-	/* Don't generate bad syntax for zero-column relation. */
-	if (first)
-		appendStringInfoString(buf, "NULL");
+	if (enclose_in_parens && list_length(retrieved_attrs) > 0)
+		appendStringInfoChar(buf, ')');
 
-	/*
-	 * Construct FROM clause
-	 */
-	appendStringInfoString(buf, " FROM ");
-	deparseRelation(buf, rel);
+	return retrieved_attrs;
 }
 
 /*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 2d88d06358..be8db5ac63 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8076,8 +8076,9 @@ copy rem2 from stdin;
 copy rem2 from stdin; -- ERROR
 ERROR:  new row for relation "loc2" violates check constraint "loc2_f1positive"
 DETAIL:  Failing row contains (-1, xyzzy).
-CONTEXT:  remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
-COPY rem2, line 1: "-1	xyzzy"
+CONTEXT:  COPY loc2, line 1: "-1	xyzzy"
+remote SQL command: COPY public.loc2(f1, f2) FROM STDIN 
+COPY rem2, line 2
 select * from rem2;
  f1 | f2  
 ----+-----
@@ -8088,6 +8089,19 @@ select * from rem2;
 alter foreign table rem2 drop constraint rem2_f1positive;
 alter table loc2 drop constraint loc2_f1positive;
 delete from rem2;
+create table foo (a int) partition by list (a);
+create table foo1 (like foo);
+create foreign table ffoo1 partition of foo for values in (1)
+	server loopback options (table_name 'foo1');
+create table foo2 (like foo);
+create foreign table ffoo2 partition of foo for values in (2)
+	server loopback options (table_name 'foo2');
+create function print_new_row() returns trigger language plpgsql as $$
+	begin raise notice '%', new; return new; end; $$;
+create trigger ffoo1_br_trig before insert on ffoo1
+	for each row execute function print_new_row();
+copy foo from stdin;
+NOTICE:  (1)
 -- Test local triggers
 create trigger trig_stmt_before before insert on rem2
 	for each statement execute procedure trigger_func();
@@ -8196,6 +8210,34 @@ drop trigger rem2_trig_row_before on rem2;
 drop trigger rem2_trig_row_after on rem2;
 drop trigger loc2_trig_row_before_insert on loc2;
 delete from rem2;
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+ERROR:  column "f1" of relation "loc2" does not exist
+CONTEXT:  remote SQL command: COPY public.loc2(f1, f2) FROM STDIN 
+COPY rem2, line 3
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+ f1 | f2 
+----+----
+(0 rows)
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(2 rows)
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(4 rows)
+
 -- test COPY FROM with foreign table created in the same transaction
 create table loc3 (f1 int, f2 text);
 begin;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index b6c72e1d1e..dd185bdc3b 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -18,6 +18,7 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "catalog/pg_class.h"
+#include "commands/copy.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
@@ -191,6 +192,7 @@ typedef struct PgFdwModifyState
 	/* for update row movement if subplan result rel */
 	struct PgFdwModifyState *aux_fmstate;	/* foreign-insert state, if
 											 * created */
+	CopyToState cstate; /* foreign COPY state, if used */
 } PgFdwModifyState;
 
 /*
@@ -357,6 +359,13 @@ static void postgresBeginForeignInsert(ModifyTableState *mtstate,
 									   ResultRelInfo *resultRelInfo);
 static void postgresEndForeignInsert(EState *estate,
 									 ResultRelInfo *resultRelInfo);
+static void postgresBeginForeignCopy(ModifyTableState *mtstate,
+									   ResultRelInfo *resultRelInfo);
+static void postgresEndForeignCopy(EState *estate,
+									 ResultRelInfo *resultRelInfo);
+static void postgresExecForeignCopy(ResultRelInfo *resultRelInfo,
+									  TupleTableSlot **slots,
+									  int nslots);
 static int	postgresIsForeignRelUpdatable(Relation rel);
 static bool postgresPlanDirectModify(PlannerInfo *root,
 									 ModifyTable *plan,
@@ -535,6 +544,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->EndForeignModify = postgresEndForeignModify;
 	routine->BeginForeignInsert = postgresBeginForeignInsert;
 	routine->EndForeignInsert = postgresEndForeignInsert;
+	routine->BeginForeignCopy = postgresBeginForeignCopy;
+	routine->EndForeignCopy = postgresEndForeignCopy;
+	routine->ExecForeignCopy = postgresExecForeignCopy;
 	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
 	routine->PlanDirectModify = postgresPlanDirectModify;
 	routine->BeginDirectModify = postgresBeginDirectModify;
@@ -2052,6 +2064,131 @@ postgresEndForeignInsert(EState *estate,
 	finish_foreign_modify(fmstate);
 }
 
+static PgFdwModifyState *copy_fmstate = NULL;
+
+static void
+pgfdw_copy_dest_cb(void *buf, int len)
+{
+	PGconn *conn = copy_fmstate->conn;
+
+	if (PQputCopyData(conn, (char *) buf, len) <= 0)
+		pgfdw_report_error(ERROR, NULL, conn, false, copy_fmstate->query);
+}
+
+/*
+ *
+ * postgresBeginForeignCopy
+ *		Begin an COPY operation on a foreign table
+ */
+static void
+postgresBeginForeignCopy(ModifyTableState *mtstate,
+						   ResultRelInfo *resultRelInfo)
+{
+	PgFdwModifyState *fmstate;
+	StringInfoData sql;
+	RangeTblEntry *rte;
+	Relation rel = resultRelInfo->ri_RelationDesc;
+
+	rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state);
+	initStringInfo(&sql);
+	deparseCopyFromSql(&sql, rel);
+
+	fmstate = create_foreign_modify(mtstate->ps.state,
+									rte,
+									resultRelInfo,
+									CMD_INSERT,
+									NULL,
+									sql.data,
+									NIL,
+									false,
+									NIL);
+
+	fmstate->cstate = BeginCopyTo(NULL, NULL, RelationGetDescr(rel), NULL,
+								  InvalidOid, NULL, false, pgfdw_copy_dest_cb,
+								  NIL, NIL);
+	CopyToStart(fmstate->cstate);
+	resultRelInfo->ri_FdwState = fmstate;
+}
+
+/*
+ * postgresEndForeignCopy
+ *		Finish an COPY operation on a foreign table
+ */
+static void
+postgresEndForeignCopy(EState *estate, ResultRelInfo *resultRelInfo)
+{
+	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+
+	/* Check correct use of CopyIn FDW API. */
+	Assert(fmstate->cstate != NULL);
+	CopyToFinish(fmstate->cstate);
+	pfree(fmstate->cstate);
+	fmstate->cstate = NULL;
+	finish_foreign_modify(fmstate);
+}
+
+/*
+ *
+ * postgresExecForeignCopy
+ *		Send a number of tuples to the foreign relation.
+ */
+static void
+postgresExecForeignCopy(ResultRelInfo *resultRelInfo,
+						  TupleTableSlot **slots, int nslots)
+{
+	PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState;
+	PGresult *res;
+	PGconn *conn = fmstate->conn;
+	bool OK = false;
+	int i;
+
+	/* Check correct use of CopyIn FDW API. */
+	Assert(fmstate->cstate != NULL);
+	Assert(copy_fmstate == NULL);
+
+	res = PQexec(conn, fmstate->query);
+	if (PQresultStatus(res) != PGRES_COPY_IN)
+		pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
+	PQclear(res);
+
+	PG_TRY();
+	{
+		copy_fmstate = fmstate;
+		for (i = 0; i < nslots; i++)
+			CopyOneRowTo(fmstate->cstate, slots[i]);
+
+		OK = true;
+	}
+	PG_FINALLY();
+	{
+		copy_fmstate = NULL; /* Detect problems */
+
+		/* Finish COPY IN protocol. It is needed to do after successful copy or
+		 * after an error.
+		 */
+		if (PQputCopyEnd(conn, OK ? NULL : _("canceled by server")) <= 0 ||
+			PQflush(conn))
+			pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+
+		/* After successfully  sending an EOF signal, check command OK. */
+		res = PQgetResult(conn);
+		if ((!OK && PQresultStatus(res) != PGRES_FATAL_ERROR) ||
+			(OK && PQresultStatus(res) != PGRES_COMMAND_OK))
+			pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+
+		PQclear(res);
+		/* Do this to ensure we've pumped libpq back to idle state */
+		if (PQgetResult(conn) != NULL)
+			ereport(ERROR,
+					(errmsg("unexpected extra results during COPY of table: %s",
+							PQerrorMessage(conn))));
+
+		if (!OK)
+			PG_RE_THROW();
+	}
+	PG_END_TRY();
+}
+
 /*
  * postgresIsForeignRelUpdatable
  *		Determine whether a foreign table supports INSERT, UPDATE and/or
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..8fc5ff018f 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -162,6 +162,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 							 List *targetAttrs, bool doNothing,
 							 List *withCheckOptionList, List *returningList,
 							 List **retrieved_attrs);
+extern void deparseCopyFromSql(StringInfo buf, Relation rel);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 7581c5417b..22dcd12f02 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2212,6 +2212,23 @@ alter table loc2 drop constraint loc2_f1positive;
 
 delete from rem2;
 
+create table foo (a int) partition by list (a);
+create table foo1 (like foo);
+create foreign table ffoo1 partition of foo for values in (1)
+	server loopback options (table_name 'foo1');
+create table foo2 (like foo);
+create foreign table ffoo2 partition of foo for values in (2)
+	server loopback options (table_name 'foo2');
+create function print_new_row() returns trigger language plpgsql as $$
+	begin raise notice '%', new; return new; end; $$;
+create trigger ffoo1_br_trig before insert on ffoo1
+	for each row execute function print_new_row();
+
+copy foo from stdin;
+1
+2
+\.
+
 -- Test local triggers
 create trigger trig_stmt_before before insert on rem2
 	for each statement execute procedure trigger_func();
@@ -2312,6 +2329,34 @@ drop trigger loc2_trig_row_before_insert on loc2;
 
 delete from rem2;
 
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+1	foo
+2	bar
+\.
+
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
 -- test COPY FROM with foreign table created in the same transaction
 create table loc3 (f1 int, f2 text);
 begin;
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 9c9293414c..a9a7402440 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -796,6 +796,79 @@ EndForeignInsert(EState *estate,
 
     <para>
 <programlisting>
+void
+BeginForeignCopy(ModifyTableState *mtstate,
+                   ResultRelInfo *rinfo);
+</programlisting>
+
+     Begin executing an copy operation on a foreign table. This routine is
+     called right before the first call of <function>ExecForeignCopy</function>
+     routine for the foreign table. It should perform any initialization needed
+     prior to the actual COPY FROM operation.
+     Subsequently, <function>ExecForeignCopy</function> will be called for
+     a bulk of tuples to be copied into the foreign table.
+    </para>
+
+    <para>
+     <literal>mtstate</literal> is the overall state of the
+     <structname>ModifyTable</structname> plan node being executed; global data about
+     the plan and execution state is available via this structure.
+     <literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
+     the target foreign table.  (The <structfield>ri_FdwState</structfield> field of
+     <structname>ResultRelInfo</structname> is available for the FDW to store any
+     private state it needs for this operation.)
+    </para>
+
+    <para>
+     When this is called by a <command>COPY FROM</command> command, the
+     plan-related global data in <literal>mtstate</literal> is not provided.
+    </para>
+
+    <para>
+     If the <function>BeginForeignCopy</function> pointer is set to
+     <literal>NULL</literal>, no action is taken for the initialization.
+    </para>
+
+    <para>
+<programlisting>
+void
+EndForeignCopy(EState *estate,
+                 ResultRelInfo *rinfo);
+</programlisting>
+
+     End the copy operation and release resources.  It is normally not important
+     to release palloc'd memory, but for example open files and connections
+     to remote servers should be cleaned up.
+    </para>
+
+    <para>
+     If the <function>EndForeignCopy</function> pointer is set to
+     <literal>NULL</literal>, no action is taken for the termination.
+    </para>
+
+    <para>
+<programlisting>
+void
+ExecForeignCopy(ResultRelInfo *rinfo,
+                  TupleTableSlot **slots,
+                  int nslots);
+</programlisting>
+
+     Copy a bulk of tuples into the foreign table.
+      <literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
+     the target foreign table.
+     <literal>slots</literal> contains the tuples to be inserted; it will match the
+     row-type definition of the foreign table.
+     <literal>nslots</literal> is a number of tuples in the <literal>slots</literal>
+    </para>
+
+    <para>
+     If the <function>ExecForeignCopy</function> pointer is set to
+     <literal>NULL</literal>, the <function>ExecForeignInsert</function> routine will be used to run COPY on the foreign table.
+    </para>
+
+    <para>
+<programlisting>
 int
 IsForeignRelUpdatable(Relation rel);
 </programlisting>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index b6143b8bf2..32cff00762 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -303,8 +303,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 	{
 		CopyToState cstate;
 
-		cstate = BeginCopyTo(pstate, rel, query, relid,
-							 stmt->filename, stmt->is_program,
+		cstate = BeginCopyTo(pstate, rel, NULL, query, relid,
+							 stmt->filename, stmt->is_program, NULL,
 							 stmt->attlist, stmt->options);
 		*processed = DoCopyTo(cstate);	/* copy from database to file */
 		EndCopyTo(cstate);
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 6d4f6cb80d..17aac24bdd 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -314,54 +314,63 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	cstate->line_buf_valid = false;
 	save_cur_lineno = cstate->cur_lineno;
 
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
+	if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+	{
+		/* Flush into foreign table or partition */
+		resultRelInfo->ri_FdwRoutine->ExecForeignCopy(resultRelInfo,
+														slots,
+														nused);
+	}
+	else
 	{
 		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
+		 * table_multi_insert may leak memory, so switch to short-lived memory
+		 * context before calling it.
 		 */
-		if (resultRelInfo->ri_NumIndices > 0)
-		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = buffer->linenos[i];
-			recheckIndexes =
-				ExecInsertIndexTuples(resultRelInfo,
-									  buffer->slots[i], estate, false, NULL,
-									  NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
-								 cstate->transition_capture);
-			list_free(recheckIndexes);
-		}
+		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+		table_multi_insert(resultRelInfo->ri_RelationDesc,
+						   slots,
+						   nused,
+						   mycid,
+						   ti_options,
+						   buffer->bistate);
+		MemoryContextSwitchTo(oldcontext);
 
-		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
-		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		for (i = 0; i < nused; i++)
 		{
-			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
-		}
+			/*
+			 * If there are any indexes, update them for all the inserted tuples,
+			 * and run AFTER ROW INSERT triggers.
+			 */
+			if (resultRelInfo->ri_NumIndices > 0)
+			{
+				List	   *recheckIndexes;
+
+				cstate->cur_lineno = buffer->linenos[i];
+				recheckIndexes =
+					ExecInsertIndexTuples(resultRelInfo, buffer->slots[i],
+										  estate, false, NULL, NIL);
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], recheckIndexes,
+									 cstate->transition_capture);
+				list_free(recheckIndexes);
+			}
+
+			/*
+			 * There's no indexes, but see if we need to run AFTER ROW INSERT
+			 * triggers anyway.
+			 */
+			else if (resultRelInfo->ri_TrigDesc != NULL &&
+					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+			{
+				cstate->cur_lineno = buffer->linenos[i];
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], NIL, cstate->transition_capture);
+			}
 
-		ExecClearTuple(slots[i]);
+			ExecClearTuple(slots[i]);
+		}
 	}
 
 	/* Mark that all slots are free */
@@ -666,8 +675,11 @@ CopyFrom(CopyFromState cstate)
 	 * checked by calling ExecSetRelationUsesMultiInsert().  It does not matter
 	 * whether partitions have any volatile default expressions as we use the
 	 * defaults from the target of the COPY command.
+	 * Also, the COPY command requires a non-zero input list of attributes.
+	 * Therefore, the length of the attribute list is checked here.
 	 */
 	if (!cstate->volatile_defexprs &&
+		list_length(cstate->attnumlist) > 0 &&
 		!contain_volatile_functions(cstate->whereClause))
 		target_resultRelInfo->ri_usesMultiInsert =
 					ExecSetRelationUsesMultiInsert(target_resultRelInfo, NULL);
@@ -691,10 +703,18 @@ CopyFrom(CopyFromState cstate)
 	 * Init copying process into foreign table. Initialization of copying into
 	 * foreign partitions will be done later.
 	 */
-	if (resultRelInfo->ri_FdwRoutine != NULL &&
-		resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
-		resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
-														 resultRelInfo);
+	if (target_resultRelInfo->ri_FdwRoutine != NULL)
+	{
+		if (target_resultRelInfo->ri_usesMultiInsert)
+		{
+			Assert(target_resultRelInfo->ri_FdwRoutine->BeginForeignCopy != NULL);
+			target_resultRelInfo->ri_FdwRoutine->BeginForeignCopy(mtstate,
+																  resultRelInfo);
+		}
+		else if (target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
+			target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
+																	resultRelInfo);
+	}
 
 	/* Prepare to catch AFTER triggers. */
 	AfterTriggerBeginQuery();
@@ -1072,10 +1092,16 @@ CopyFrom(CopyFromState cstate)
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
 	/* Allow the FDW to shut down */
-	if (target_resultRelInfo->ri_FdwRoutine != NULL &&
-		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
-		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
-															  target_resultRelInfo);
+	if (target_resultRelInfo->ri_FdwRoutine != NULL)
+	{
+		if (target_resultRelInfo->ri_usesMultiInsert &&
+			target_resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL)
+			target_resultRelInfo->ri_FdwRoutine->EndForeignCopy(estate,
+														target_resultRelInfo);
+		else if (target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
+			target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
+														target_resultRelInfo);
+	}
 
 	/* Tear down the multi-insert buffer data */
 	CopyMultiInsertInfoCleanup(&multiInsertInfo);
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c7e5f04446..608bb3771d 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -50,6 +50,7 @@ typedef enum CopyDest
 	COPY_FILE,					/* to file (or a piped program) */
 	COPY_OLD_FE,				/* to frontend (2.0 protocol) */
 	COPY_NEW_FE,				/* to frontend (3.0 protocol) */
+	COPY_CALLBACK				/* to callback function */
 } CopyDest;
 
 /*
@@ -80,11 +81,14 @@ typedef struct CopyToStateData
 
 	/* parameters from the COPY command */
 	Relation	rel;			/* relation to copy to */
+	TupleDesc	tupDesc;		/* COPY TO will be used for manual tuple copying
+								  * into the destination */
 	QueryDesc  *queryDesc;		/* executable query to copy from */
 	List	   *attnumlist;		/* integer list of attnums to copy */
 	char	   *filename;		/* filename, or NULL for STDOUT */
 	bool		is_program;		/* is 'filename' a program to popen? */
 
+	copy_data_dest_cb data_dest_cb;	/* function for writing data */
 	CopyFormatOptions opts;
 	Node	   *whereClause;	/* WHERE condition (or NULL) */
 
@@ -114,7 +118,6 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
 static void EndCopy(CopyToState cstate);
 static void ClosePipeToProgram(CopyToState cstate);
 static uint64 CopyTo(CopyToState cstate);
-static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
 static void CopyAttributeOutText(CopyToState cstate, char *string);
 static void CopyAttributeOutCSV(CopyToState cstate, char *string,
 								bool use_quote, bool single_attr);
@@ -286,6 +289,14 @@ CopySendEndOfRow(CopyToState cstate)
 			/* Dump the accumulated row as one CopyData message */
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
+		case COPY_CALLBACK:
+			Assert(!cstate->opts.binary);
+#ifndef WIN32
+			CopySendChar(cstate, '\n');
+#else
+			CopySendString(cstate, "\r\n");
+#endif
+			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
 	}
 
 	resetStringInfo(fe_msgbuf);
@@ -373,19 +384,24 @@ EndCopy(CopyToState cstate)
 CopyToState
 BeginCopyTo(ParseState *pstate,
 			Relation rel,
+			TupleDesc srcTupDesc,
 			RawStmt *raw_query,
 			Oid queryRelId,
 			const char *filename,
 			bool is_program,
+			copy_data_dest_cb data_dest_cb,
 			List *attnamelist,
 			List *options)
 {
 	CopyToState	cstate;
-	bool		pipe = (filename == NULL);
+	bool		pipe = (filename == NULL) && (data_dest_cb == NULL);
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	MemoryContext oldcontext;
 
+	/* Impossible to mix CopyTo modes */
+	Assert(rel == NULL || srcTupDesc == NULL);
+
 	if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
 	{
 		if (rel->rd_rel->relkind == RELKIND_VIEW)
@@ -450,6 +466,11 @@ BeginCopyTo(ParseState *pstate,
 
 		tupDesc = RelationGetDescr(cstate->rel);
 	}
+	else if (srcTupDesc)
+	{
+		Assert(!raw_query);
+		tupDesc = cstate->tupDesc = srcTupDesc;
+	}
 	else
 	{
 		List	   *rewritten;
@@ -695,6 +716,11 @@ BeginCopyTo(ParseState *pstate,
 		if (whereToSendOutput != DestRemote)
 			cstate->copy_file = stdout;
 	}
+	else if (data_dest_cb)
+	{
+		cstate->copy_dest = COPY_CALLBACK;
+		cstate->data_dest_cb = data_dest_cb;
+	}
 	else
 	{
 		cstate->filename = pstrdup(filename);
@@ -772,7 +798,7 @@ BeginCopyTo(ParseState *pstate,
 uint64
 DoCopyTo(CopyToState cstate)
 {
-	bool		pipe = (cstate->filename == NULL);
+	bool		pipe = (cstate->filename == NULL) && (cstate->data_dest_cb == NULL);
 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
 	uint64		processed;
 
@@ -781,7 +807,9 @@ DoCopyTo(CopyToState cstate)
 		if (fe_copy)
 			SendCopyBegin(cstate);
 
+		CopyToStart(cstate);
 		processed = CopyTo(cstate);
+		CopyToFinish(cstate);
 
 		if (fe_copy)
 			SendCopyEnd(cstate);
@@ -821,18 +849,22 @@ EndCopyTo(CopyToState cstate)
 }
 
 /*
- * Copy from relation or query TO file.
+ * Start COPY TO operation.
+ * Separated to the routine to prevent duplicate operations in the case of
+ * manual mode, where tuples are copied to the destination one by one, by call of
+ * the CopyOneRowTo() routine.
  */
-static uint64
-CopyTo(CopyToState cstate)
+void
+CopyToStart(CopyToState cstate)
 {
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	ListCell   *cur;
-	uint64		processed;
 
 	if (cstate->rel)
 		tupDesc = RelationGetDescr(cstate->rel);
+	else if (cstate->tupDesc)
+		tupDesc = cstate->tupDesc;
 	else
 		tupDesc = cstate->queryDesc->tupDesc;
 	num_phys_attrs = tupDesc->natts;
@@ -919,6 +951,32 @@ CopyTo(CopyToState cstate)
 			CopySendEndOfRow(cstate);
 		}
 	}
+}
+
+/*
+ * Finish COPY TO operation.
+ */
+void
+CopyToFinish(CopyToState cstate)
+{
+	if (cstate->opts.binary)
+	{
+		/* Generate trailer for a binary copy */
+		CopySendInt16(cstate, -1);
+		/* Need to flush out the trailer */
+		CopySendEndOfRow(cstate);
+	}
+
+	MemoryContextDelete(cstate->rowcontext);
+}
+
+/*
+ * Copy from relation or query TO file.
+ */
+static uint64
+CopyTo(CopyToState cstate)
+{
+	uint64		processed;
 
 	if (cstate->rel)
 	{
@@ -951,23 +1009,13 @@ CopyTo(CopyToState cstate)
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
 
-	if (cstate->opts.binary)
-	{
-		/* Generate trailer for a binary copy */
-		CopySendInt16(cstate, -1);
-		/* Need to flush out the trailer */
-		CopySendEndOfRow(cstate);
-	}
-
-	MemoryContextDelete(cstate->rowcontext);
-
 	return processed;
 }
 
 /*
  * Emit one row during CopyTo().
  */
-static void
+void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
 	bool		need_delim = false;
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 9809c03a8e..a21d4d2fc1 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1294,8 +1294,12 @@ ExecSetRelationUsesMultiInsert(const ResultRelInfo *rri,
 		rri->ri_TrigDesc->trig_insert_new_table)
 		return false;
 
-	/* Foreign tables don't support multi-inserts. */
-	if (rri->ri_FdwRoutine != NULL)
+	if (rri->ri_FdwRoutine != NULL &&
+		rri->ri_FdwRoutine->ExecForeignCopy == NULL)
+		/*
+		 * Foreign tables don't support multi-inserts, unless their FDW
+		 * provides the necessary COPY interface.
+		 */
 		return false;
 
 	/* OK, caller can use multi-insert on this relation. */
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index 637e900b09..f3b9197db1 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -996,9 +996,16 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
 	 * If the partition is a foreign table, let the FDW init itself for
 	 * routing tuples to the partition.
 	 */
-	if (partRelInfo->ri_FdwRoutine != NULL &&
-		partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
-		partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
+	if (partRelInfo->ri_FdwRoutine != NULL)
+	{
+		if (partRelInfo->ri_usesMultiInsert)
+		{
+			Assert(partRelInfo->ri_FdwRoutine->BeginForeignCopy != NULL);
+			partRelInfo->ri_FdwRoutine->BeginForeignCopy(mtstate, partRelInfo);
+		}
+		else if (partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
+			partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
+	}
 
 	partRelInfo->ri_CopyMultiInsertBuffer = NULL;
 
@@ -1199,10 +1206,16 @@ ExecCleanupTupleRouting(ModifyTableState *mtstate,
 		ResultRelInfo *resultRelInfo = proute->partitions[i];
 
 		/* Allow any FDWs to shut down */
-		if (resultRelInfo->ri_FdwRoutine != NULL &&
-			resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
-			resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state,
-														   resultRelInfo);
+		if (resultRelInfo->ri_FdwRoutine != NULL)
+		{
+			if (resultRelInfo->ri_usesMultiInsert &&
+				resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL)
+				resultRelInfo->ri_FdwRoutine->EndForeignCopy(mtstate->ps.state,
+															   resultRelInfo);
+			else if (resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
+				resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state,
+															   resultRelInfo);
+		}
 
 		/*
 		 * Check if this result rel is one belonging to the node's subplans,
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index 127a3c61e2..01bb3e8ad4 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -55,6 +55,7 @@ typedef struct CopyFromStateData *CopyFromState;
 typedef struct CopyToStateData *CopyToState;
 
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *outbuf, int len);
 
 extern void DoCopy(ParseState *state, const CopyStmt *stmt,
 				   int stmt_location, int stmt_len,
@@ -78,12 +79,17 @@ extern DestReceiver *CreateCopyDestReceiver(void);
 /*
  * internal prototypes
  */
-extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
+extern CopyToState BeginCopyTo(ParseState *pstate, Relation rel,
+							   TupleDesc tupDesc, RawStmt *query,
 							   Oid queryRelId, const char *filename, bool is_program,
+							   copy_data_dest_cb data_dest_cb,
 							   List *attnamelist, List *options);
 extern void EndCopyTo(CopyToState cstate);
 extern uint64 DoCopyTo(CopyToState cstate);
 extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
 							List *attnamelist);
+extern void CopyToStart(CopyToState cstate);
+extern void CopyToFinish(CopyToState cstate);
+extern void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot);
 
 #endif							/* COPY_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..52b213f5aa 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -104,6 +104,16 @@ typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate,
 typedef void (*EndForeignInsert_function) (EState *estate,
 										   ResultRelInfo *rinfo);
 
+typedef void (*BeginForeignCopy_function) (ModifyTableState *mtstate,
+										   ResultRelInfo *rinfo);
+
+typedef void (*ExecForeignCopy_function) (ResultRelInfo *rinfo,
+										  TupleTableSlot **slots,
+										  int nslots);
+
+typedef void (*EndForeignCopy_function) (EState *estate,
+										 ResultRelInfo *rinfo);
+
 typedef int (*IsForeignRelUpdatable_function) (Relation rel);
 
 typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
@@ -220,6 +230,11 @@ typedef struct FdwRoutine
 	IterateDirectModify_function IterateDirectModify;
 	EndDirectModify_function EndDirectModify;
 
+	/* Support functions for COPY into foreign tables */
+	BeginForeignCopy_function BeginForeignCopy;
+	ExecForeignCopy_function ExecForeignCopy;
+	EndForeignCopy_function EndForeignCopy;
+
 	/* Functions for SELECT FOR UPDATE/SHARE row locking */
 	GetForeignRowMarkType_function GetForeignRowMarkType;
 	RefetchForeignRow_function RefetchForeignRow;
-- 
2.25.1

Reply via email to