Thanks Andrey for the patch. I am glad that the patch has taken care
of some corner cases already but there exist still more.

COPY command constructed doesn't take care of dropped columns. There
is code in deparseAnalyzeSql which constructs list of columns for a
given foreign relation. 0002 patch attached here, moves that code to a
separate function and reuses it for COPY. If you find that code change
useful please include it in the main patch.

While working on that, I found two issues
1. The COPY command constructed an empty columns list when there were
no non-dropped columns in the relation. This caused a syntax error.
Fixed that in 0002.
2. In the same case, if the foreign table declared locally didn't have
any non-dropped columns but the relation that it referred to on the
foreign server had some non-dropped columns, COPY command fails. I
added a test case for this in 0002 but haven't fixed it.

I think this work is useful. Please add it to the next commitfest so
that it's tracked.

On Tue, Jun 2, 2020 at 11:21 AM Andrey Lepikhov
<a.lepik...@postgrespro.ru> wrote:
>
> Thank you for the answer,
>
> 02.06.2020 05:02, Etsuro Fujita пишет:
> > I think I also thought something similar to this before [1].  Will take a 
> > look.
>
> > [1] 
> > https://www.postgresql.org/message-id/23990375-45a6-5823-b0aa-a6a7a6a957f0%40lab.ntt.co.jp
> >
> I have looked into the thread.
> My first version of the patch was like your idea. But when developing
> the “COPY FROM” code, the following features were discovered:
> 1. Two or more partitions can be placed at the same node. We need to
> finish COPY into one partition before start COPY into another partition
> at the same node.
> 2. On any error we need to send EOF to all started "COPY .. FROM STDIN"
> operations. Otherwise FDW can't cancel operation.
>
> Hiding the COPY code under the buffers management machinery allows us to
> generalize buffers machinery, execute one COPY operation on each buffer
> and simplify error handling.
>
> As i understand, main idea of the thread, mentioned by you, is to add
> "COPY FROM" support without changes in FDW API.
> It is possible to remove BeginForeignCopy() and EndForeignCopy() from
> the patch. But it is not trivial to change ExecForeignInsert() for the
> COPY purposes.
> All that I can offer in this place now is to introduce one new
> ExecForeignBulkInsert(buf) routine that will execute single "COPY FROM
> STDIN" operation, send tuples and close the operation. We can use the
> ExecForeignInsert() routine for each buffer tuple if
> ExecForeignBulkInsert() is not supported.
>
> One of main questions here is to use COPY TO machinery for serializing a
> tuple. It is needed (if you will take a look into the patch) to
> transform the CopyTo() routine to an iterative representation:
> start/next/finish. May it be acceptable?
>
> In the attachment there is a patch with the correction of a stupid error.
>
> --
> Andrey Lepikhov
> Postgres Professional
> https://postgrespro.com
> The Russian Postgres Company



-- 
Best Wishes,
Ashutosh Bapat
From 9c4e09bd03cb98b1f84c42c34ce7b76e0a87011c Mon Sep 17 00:00:00 2001
From: "Andrey V. Lepikhov" <a.lepik...@postgrespro.ru>
Date: Fri, 29 May 2020 10:39:57 +0500
Subject: [PATCH 1/2] Fast COPY FROM into the foreign (or sharded) table.

---
 contrib/postgres_fdw/deparse.c                |  25 ++
 .../postgres_fdw/expected/postgres_fdw.out    |   5 +-
 contrib/postgres_fdw/postgres_fdw.c           |  95 ++++++++
 contrib/postgres_fdw/postgres_fdw.h           |   1 +
 src/backend/commands/copy.c                   | 213 ++++++++++++------
 src/include/commands/copy.h                   |   5 +
 src/include/foreign/fdwapi.h                  |   9 +
 7 files changed, 286 insertions(+), 67 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index ad37a74221..427402c8eb 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1758,6 +1758,31 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 						 withCheckOptionList, returningList, retrieved_attrs);
 }
 
+/*
+ * Deparse COPY FROM into given buf.
+ * We need to use list of parameters at each query.
+ */
+void
+deparseCopyFromSql(StringInfo buf, Relation rel)
+{
+	int attnum;
+
+	appendStringInfoString(buf, "COPY ");
+	deparseRelation(buf, rel);
+	appendStringInfoString(buf, " ( ");
+
+	for(attnum = 0; attnum < rel->rd_att->natts; attnum++)
+	{
+		appendStringInfoString(buf, NameStr(rel->rd_att->attrs[attnum].attname));
+
+		if (attnum != rel->rd_att->natts-1)
+			appendStringInfoString(buf, ", ");
+	}
+
+	appendStringInfoString(buf, " ) ");
+	appendStringInfoString(buf, " FROM STDIN ");
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 82fc1290ef..922c08d2dc 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8063,8 +8063,9 @@ copy rem2 from stdin;
 copy rem2 from stdin; -- ERROR
 ERROR:  new row for relation "loc2" violates check constraint "loc2_f1positive"
 DETAIL:  Failing row contains (-1, xyzzy).
-CONTEXT:  remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
-COPY rem2, line 1: "-1	xyzzy"
+CONTEXT:  COPY loc2, line 1: "-1	xyzzy"
+remote SQL command: COPY public.loc2 ( f1, f2 )  FROM STDIN 
+COPY rem2, line 2
 select * from rem2;
  f1 | f2  
 ----+-----
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9fc53cad68..bd2a8f596f 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -18,6 +18,7 @@
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "catalog/pg_class.h"
+#include "commands/copy.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
@@ -190,6 +191,7 @@ typedef struct PgFdwModifyState
 	/* for update row movement if subplan result rel */
 	struct PgFdwModifyState *aux_fmstate;	/* foreign-insert state, if
 											 * created */
+	CopyState fdwcstate;
 } PgFdwModifyState;
 
 /*
@@ -350,12 +352,16 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate,
 												 ResultRelInfo *resultRelInfo,
 												 TupleTableSlot *slot,
 												 TupleTableSlot *planSlot);
+static void postgresExecForeignCopy(ResultRelInfo *resultRelInfo,
+												 TupleTableSlot *slot);
 static void postgresEndForeignModify(EState *estate,
 									 ResultRelInfo *resultRelInfo);
 static void postgresBeginForeignInsert(ModifyTableState *mtstate,
 									   ResultRelInfo *resultRelInfo);
 static void postgresEndForeignInsert(EState *estate,
 									 ResultRelInfo *resultRelInfo);
+static void postgresBeginForeignCopy(ResultRelInfo *resultRelInfo);
+static void postgresEndForeignCopy(ResultRelInfo *resultRelInfo, bool status);
 static int	postgresIsForeignRelUpdatable(Relation rel);
 static bool postgresPlanDirectModify(PlannerInfo *root,
 									 ModifyTable *plan,
@@ -530,9 +536,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
 	routine->ExecForeignInsert = postgresExecForeignInsert;
 	routine->ExecForeignUpdate = postgresExecForeignUpdate;
 	routine->ExecForeignDelete = postgresExecForeignDelete;
+	routine->ExecForeignCopy = postgresExecForeignCopy;
 	routine->EndForeignModify = postgresEndForeignModify;
 	routine->BeginForeignInsert = postgresBeginForeignInsert;
 	routine->EndForeignInsert = postgresEndForeignInsert;
+	routine->BeginForeignCopy = postgresBeginForeignCopy;
+	routine->EndForeignCopy = postgresEndForeignCopy;
 	routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
 	routine->PlanDirectModify = postgresPlanDirectModify;
 	routine->BeginDirectModify = postgresBeginDirectModify;
@@ -1890,6 +1899,27 @@ postgresExecForeignDelete(EState *estate,
 								  slot, planSlot);
 }
 
+/*
+ * postgresExecForeignCopy
+ *		Copy one row into a foreign table
+ */
+static void
+postgresExecForeignCopy(ResultRelInfo *resultRelInfo, TupleTableSlot *slot)
+{
+	PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState;
+	char *buf;
+
+	buf = NextForeignCopyRow(fmstate->fdwcstate, slot);
+
+	if (PQputCopyData(fmstate->conn, buf, strlen(buf)) <= 0)
+	{
+		PGresult *res;
+
+		res = PQgetResult(fmstate->conn);
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
+			pgfdw_report_error(ERROR, res, fmstate->conn, false, fmstate->query);
+	}
+}
 /*
  * postgresEndForeignModify
  *		Finish an insert/update/delete operation on a foreign table
@@ -2051,6 +2081,71 @@ postgresEndForeignInsert(EState *estate,
 	finish_foreign_modify(fmstate);
 }
 
+/*
+ * postgresBeginForeignCopy
+ *		Begin an COPY operation on a foreign table
+ */
+static void
+postgresBeginForeignCopy(ResultRelInfo *resultRelInfo)
+{
+	Relation rel = resultRelInfo->ri_RelationDesc;
+	PgFdwModifyState *fmstate = (PgFdwModifyState *) (resultRelInfo->ri_FdwState);
+	StringInfoData sql;
+	PGresult *res;
+
+	Assert(resultRelInfo->ri_FdwRoutine != NULL);
+
+	fmstate->target_attrs = NULL;
+	fmstate->has_returning = false;
+	fmstate->retrieved_attrs = NULL;
+
+	if (fmstate->fdwcstate == NULL)
+		fmstate->fdwcstate = BeginForeignCopyTo(rel);
+
+	initStringInfo(&sql);
+	deparseCopyFromSql(&sql, rel);
+	fmstate->query = sql.data;
+
+	res = PQexec(fmstate->conn, fmstate->query);
+}
+
+/*
+ * postgresEndForeignCopy
+ *		Finish an COPY operation on a foreign table
+ */
+static void
+postgresEndForeignCopy(ResultRelInfo *resultRelInfo, bool status)
+{
+	PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+	PGresult *res;
+
+	Assert(fmstate != NULL);
+
+	if (!status)
+	{
+		PQputCopyEnd(fmstate->conn, (PQprotocolVersion(fmstate->conn) < 3) ?
+					NULL :
+					_("aborted foreign copy"));
+		pfree(fmstate->fdwcstate);
+		fmstate->fdwcstate = NULL;
+		EndForeignCopyTo(fmstate->fdwcstate);
+		return;
+	}
+
+	while (res = PQgetResult(fmstate->conn), PQresultStatus(res) == PGRES_COPY_IN)
+	{
+		/* We can't send an error message if we're using protocol version 2 */
+		PQputCopyEnd(fmstate->conn, (status || PQprotocolVersion(fmstate->conn) < 3) ? NULL :
+					 _("aborted foreign copy"));
+		PQclear(res);
+	}
+
+	if (PQresultStatus(res) != PGRES_COMMAND_OK)
+		pgfdw_report_error(ERROR, res, fmstate->conn, false, fmstate->query);
+
+	while (PQgetResult(fmstate->conn) != NULL);
+}
+
 /*
  * postgresIsForeignRelUpdatable
  *		Determine whether a foreign table supports INSERT, UPDATE and/or
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..8fc5ff018f 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -162,6 +162,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
 							 List *targetAttrs, bool doNothing,
 							 List *withCheckOptionList, List *returningList,
 							 List **retrieved_attrs);
+extern void deparseCopyFromSql(StringInfo buf, Relation rel);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
 							 Index rtindex, Relation rel,
 							 List *targetAttrs,
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6d53dc463c..87e0f46846 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -133,6 +133,7 @@ typedef struct CopyStateData
 	char	   *filename;		/* filename, or NULL for STDIN/STDOUT */
 	bool		is_program;		/* is 'filename' a program to popen? */
 	copy_data_source_cb data_source_cb; /* function for reading data */
+	copy_data_dest_cb data_dest_cb;
 	bool		binary;			/* binary format? */
 	bool		freeze;			/* freeze rows on loading? */
 	bool		csv_mode;		/* Comma Separated Value format? */
@@ -358,8 +359,11 @@ static void EndCopy(CopyState cstate);
 static void ClosePipeToProgram(CopyState cstate);
 static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
 							 Oid queryRelId, const char *filename, bool is_program,
-							 List *attnamelist, List *options);
+							 copy_data_dest_cb data_dest_cb, List *attnamelist,
+							 List *options);
 static void EndCopyTo(CopyState cstate);
+static void CopyToStart(CopyState cstate);
+static void CopyToFinish(CopyState cstate);
 static uint64 DoCopyTo(CopyState cstate);
 static uint64 CopyTo(CopyState cstate);
 static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
@@ -587,7 +591,9 @@ CopySendEndOfRow(CopyState cstate)
 			(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
 			break;
 		case COPY_CALLBACK:
-			Assert(false);		/* Not yet supported. */
+			CopySendChar(cstate, '\n');
+			CopySendChar(cstate, '\0');
+			cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
 			break;
 	}
 
@@ -1075,7 +1081,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 	else
 	{
 		cstate = BeginCopyTo(pstate, rel, query, relid,
-							 stmt->filename, stmt->is_program,
+							 stmt->filename, stmt->is_program, NULL,
 							 stmt->attlist, stmt->options);
 		*processed = DoCopyTo(cstate);	/* copy from database to file */
 		EndCopyTo(cstate);
@@ -1815,6 +1821,32 @@ EndCopy(CopyState cstate)
 	pfree(cstate);
 }
 
+static char *buf = NULL;
+static void
+data_dest_cb(void *outbuf, int len)
+{
+	buf = (char *) palloc(len);
+	memcpy(buf, (char *) outbuf, len);
+}
+
+CopyState
+BeginForeignCopyTo(Relation rel)
+{
+	CopyState cstate;
+
+	cstate = BeginCopy(NULL, false, rel, NULL, InvalidOid, NIL, NIL);
+	cstate->copy_dest = COPY_CALLBACK;
+	cstate->data_dest_cb = data_dest_cb;
+	CopyToStart(cstate);
+	return cstate;
+}
+
+void
+EndForeignCopyTo(CopyState cstate)
+{
+	CopyToFinish(cstate);
+}
+
 /*
  * Setup CopyState to read tuples from a table or a query for COPY TO.
  */
@@ -1825,6 +1857,7 @@ BeginCopyTo(ParseState *pstate,
 			Oid queryRelId,
 			const char *filename,
 			bool is_program,
+			copy_data_dest_cb data_dest_cb,
 			List *attnamelist,
 			List *options)
 {
@@ -1880,6 +1913,11 @@ BeginCopyTo(ParseState *pstate,
 		if (whereToSendOutput != DestRemote)
 			cstate->copy_file = stdout;
 	}
+	else if (data_dest_cb)
+	{
+		cstate->copy_dest = COPY_CALLBACK;
+		cstate->data_dest_cb = data_dest_cb;
+	}
 	else
 	{
 		cstate->filename = pstrdup(filename);
@@ -1950,6 +1988,13 @@ BeginCopyTo(ParseState *pstate,
 	return cstate;
 }
 
+char *
+NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot)
+{
+	CopyOneRowTo(cstate, slot);
+	return buf;
+}
+
 /*
  * This intermediate routine exists mainly to localize the effects of setjmp
  * so we don't need to plaster a lot of variables with "volatile".
@@ -1966,7 +2011,9 @@ DoCopyTo(CopyState cstate)
 		if (fe_copy)
 			SendCopyBegin(cstate);
 
+		CopyToStart(cstate);
 		processed = CopyTo(cstate);
+		CopyToFinish(cstate);
 
 		if (fe_copy)
 			SendCopyEnd(cstate);
@@ -2005,16 +2052,12 @@ EndCopyTo(CopyState cstate)
 	EndCopy(cstate);
 }
 
-/*
- * Copy from relation or query TO file.
- */
-static uint64
-CopyTo(CopyState cstate)
+static void
+CopyToStart(CopyState cstate)
 {
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	ListCell   *cur;
-	uint64		processed;
 
 	if (cstate->rel)
 		tupDesc = RelationGetDescr(cstate->rel);
@@ -2104,6 +2147,29 @@ CopyTo(CopyState cstate)
 			CopySendEndOfRow(cstate);
 		}
 	}
+}
+
+static void
+CopyToFinish(CopyState cstate)
+{
+	if (cstate->binary)
+	{
+		/* Generate trailer for a binary copy */
+		CopySendInt16(cstate, -1);
+		/* Need to flush out the trailer */
+		CopySendEndOfRow(cstate);
+	}
+
+	MemoryContextDelete(cstate->rowcontext);
+}
+
+/*
+ * Copy from relation or query TO file.
+ */
+static uint64
+CopyTo(CopyState cstate)
+{
+	uint64		processed;
 
 	if (cstate->rel)
 	{
@@ -2135,17 +2201,6 @@ CopyTo(CopyState cstate)
 		ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
-
-	if (cstate->binary)
-	{
-		/* Generate trailer for a binary copy */
-		CopySendInt16(cstate, -1);
-		/* Need to flush out the trailer */
-		CopySendEndOfRow(cstate);
-	}
-
-	MemoryContextDelete(cstate->rowcontext);
-
 	return processed;
 }
 
@@ -2449,53 +2504,82 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	cstate->line_buf_valid = false;
 	save_cur_lineno = cstate->cur_lineno;
 
-	/*
-	 * table_multi_insert may leak memory, so switch to short-lived memory
-	 * context before calling it.
-	 */
-	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	table_multi_insert(resultRelInfo->ri_RelationDesc,
-					   slots,
-					   nused,
-					   mycid,
-					   ti_options,
-					   buffer->bistate);
-	MemoryContextSwitchTo(oldcontext);
-
-	for (i = 0; i < nused; i++)
+	if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
 	{
-		/*
-		 * If there are any indexes, update them for all the inserted tuples,
-		 * and run AFTER ROW INSERT triggers.
-		 */
-		if (resultRelInfo->ri_NumIndices > 0)
+		/* Flush into foreign table or partition */
+		int i;
+		bool status = false;
+
+		Assert(resultRelInfo->ri_FdwRoutine != NULL &&
+			   resultRelInfo->ri_FdwState != NULL);
+
+		PG_TRY();
 		{
-			List	   *recheckIndexes;
-
-			cstate->cur_lineno = buffer->linenos[i];
-			recheckIndexes =
-				ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
-									  NIL);
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], recheckIndexes,
-								 cstate->transition_capture);
-			list_free(recheckIndexes);
+			resultRelInfo->ri_FdwRoutine->BeginForeignCopy(resultRelInfo);
+			for (i = 0; i < nused; i++)
+				resultRelInfo->ri_FdwRoutine->ExecForeignCopy(resultRelInfo,
+															  slots[i]);
+			status = true;
 		}
-
+		PG_FINALLY();
+		{
+			resultRelInfo->ri_FdwRoutine->EndForeignCopy(
+														buffer->resultRelInfo,
+														status);
+		}
+		PG_END_TRY();
+	}
+	else
+	{
 		/*
-		 * There's no indexes, but see if we need to run AFTER ROW INSERT
-		 * triggers anyway.
+		 * table_multi_insert may leak memory, so switch to short-lived memory
+		 * context before calling it.
 		 */
-		else if (resultRelInfo->ri_TrigDesc != NULL &&
-				 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
-				  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+		oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+		table_multi_insert(resultRelInfo->ri_RelationDesc,
+						   slots,
+						   nused,
+						   mycid,
+						   ti_options,
+						   buffer->bistate);
+		MemoryContextSwitchTo(oldcontext);
+
+		for (i = 0; i < nused; i++)
 		{
-			cstate->cur_lineno = buffer->linenos[i];
-			ExecARInsertTriggers(estate, resultRelInfo,
-								 slots[i], NIL, cstate->transition_capture);
-		}
+			/*
+			 * If there are any indexes, update them for all the inserted tuples,
+			 * and run AFTER ROW INSERT triggers.
+			 */
+			if (resultRelInfo->ri_NumIndices > 0)
+			{
+				List	   *recheckIndexes;
+
+				cstate->cur_lineno = buffer->linenos[i];
+				recheckIndexes =
+					ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
+										  NIL);
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], recheckIndexes,
+									 cstate->transition_capture);
+				list_free(recheckIndexes);
+			}
 
-		ExecClearTuple(slots[i]);
+			/*
+			 * There's no indexes, but see if we need to run AFTER ROW INSERT
+			 * triggers anyway.
+			 */
+			else if (resultRelInfo->ri_TrigDesc != NULL &&
+					 (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+					  resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+			{
+				cstate->cur_lineno = buffer->linenos[i];
+				ExecARInsertTriggers(estate, resultRelInfo,
+									 slots[i], NIL, cstate->transition_capture);
+			}
+
+			ExecClearTuple(slots[i]);
+		}
 	}
 
 	/* Mark that all slots are free */
@@ -2868,8 +2952,7 @@ CopyFrom(CopyState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
-	else if (resultRelInfo->ri_FdwRoutine != NULL ||
-			 cstate->volatile_defexprs)
+	else if (cstate->volatile_defexprs)
 	{
 		/*
 		 * Can't support multi-inserts to foreign tables or if there are any
@@ -3037,8 +3120,7 @@ CopyFrom(CopyState cstate)
 				 */
 				leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
 					!has_before_insert_row_trig &&
-					!has_instead_insert_row_trig &&
-					resultRelInfo->ri_FdwRoutine == NULL;
+					!has_instead_insert_row_trig;
 
 				/* Set the multi-insert buffer to use for this partition. */
 				if (leafpart_use_multi_insert)
@@ -3048,7 +3130,8 @@ CopyFrom(CopyState cstate)
 													   resultRelInfo);
 				}
 				else if (insertMethod == CIM_MULTI_CONDITIONAL &&
-						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+						 !CopyMultiInsertInfoIsEmpty(&multiInsertInfo) &&
+						 resultRelInfo->ri_FdwRoutine == NULL)
 				{
 					/*
 					 * Flush pending inserts if this partition can't use
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833565..ef119a761a 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -22,6 +22,7 @@
 /* CopyStateData is private in commands/copy.c */
 typedef struct CopyStateData *CopyState;
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *outbuf, int len);
 
 extern void DoCopy(ParseState *state, const CopyStmt *stmt,
 				   int stmt_location, int stmt_len,
@@ -41,4 +42,8 @@ extern uint64 CopyFrom(CopyState cstate);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern CopyState BeginForeignCopyTo(Relation rel);
+extern char *NextForeignCopyRow(CopyState cstate, TupleTableSlot *slot);
+extern void EndForeignCopyTo(CopyState cstate);
+
 #endif							/* COPY_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..197301c5a5 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -94,6 +94,8 @@ typedef TupleTableSlot *(*ExecForeignDelete_function) (EState *estate,
 													   ResultRelInfo *rinfo,
 													   TupleTableSlot *slot,
 													   TupleTableSlot *planSlot);
+typedef void (*ExecForeignCopy_function) (ResultRelInfo *rinfo,
+										  TupleTableSlot *slot);
 
 typedef void (*EndForeignModify_function) (EState *estate,
 										   ResultRelInfo *rinfo);
@@ -104,6 +106,10 @@ typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate,
 typedef void (*EndForeignInsert_function) (EState *estate,
 										   ResultRelInfo *rinfo);
 
+typedef void (*BeginForeignCopy_function) (ResultRelInfo *rinfo);
+
+typedef void (*EndForeignCopy_function) (ResultRelInfo *rinfo, bool status);
+
 typedef int (*IsForeignRelUpdatable_function) (Relation rel);
 
 typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
@@ -211,9 +217,12 @@ typedef struct FdwRoutine
 	ExecForeignInsert_function ExecForeignInsert;
 	ExecForeignUpdate_function ExecForeignUpdate;
 	ExecForeignDelete_function ExecForeignDelete;
+	ExecForeignCopy_function ExecForeignCopy;
 	EndForeignModify_function EndForeignModify;
 	BeginForeignInsert_function BeginForeignInsert;
 	EndForeignInsert_function EndForeignInsert;
+	BeginForeignCopy_function BeginForeignCopy;
+	EndForeignCopy_function EndForeignCopy;
 	IsForeignRelUpdatable_function IsForeignRelUpdatable;
 	PlanDirectModify_function PlanDirectModify;
 	BeginDirectModify_function BeginDirectModify;
-- 
2.17.1

From 79b37b9160572a9d730c97d9fc1f471064b26c7e Mon Sep 17 00:00:00 2001
From: Ashutosh Bapat <ashutosh.ba...@2ndquadrant.com>
Date: Mon, 15 Jun 2020 10:43:05 +0530
Subject: [PATCH 2/2] Separate code to list all columns of a foreign relation
 to in its own function

This code resides in deparseAnalyzeSql() but is useful for COPY as well.
So separate it into its own function. This takes care of any dropped
columns or no columns cases.

However COPY command constructed for a relation which doesn't have any
non-dropped columns locally but has some columns on the remote server
has a syntax error. This needs to be fixed.

Ashutosh Bapat
---
 contrib/postgres_fdw/deparse.c                | 58 +++++++++++--------
 .../postgres_fdw/expected/postgres_fdw.out    | 16 ++++-
 contrib/postgres_fdw/sql/postgres_fdw.sql     | 19 ++++++
 3 files changed, 69 insertions(+), 24 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index 427402c8eb..aa06342bfa 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -184,6 +184,8 @@ static void appendAggOrderBy(List *orderList, List *targetList,
 static void appendFunctionName(Oid funcid, deparse_expr_cxt *context);
 static Node *deparseSortGroupClause(Index ref, List *tlist, bool force_colno,
 									deparse_expr_cxt *context);
+static List *deparseRelColumnList(StringInfo buf, Relation rel,
+								  bool enclose_in_parens);
 
 /*
  * Helper functions
@@ -1769,17 +1771,7 @@ deparseCopyFromSql(StringInfo buf, Relation rel)
 
 	appendStringInfoString(buf, "COPY ");
 	deparseRelation(buf, rel);
-	appendStringInfoString(buf, " ( ");
-
-	for(attnum = 0; attnum < rel->rd_att->natts; attnum++)
-	{
-		appendStringInfoString(buf, NameStr(rel->rd_att->attrs[attnum].attname));
-
-		if (attnum != rel->rd_att->natts-1)
-			appendStringInfoString(buf, ", ");
-	}
-
-	appendStringInfoString(buf, " ) ");
+	(void) deparseRelColumnList(buf, rel, true);
 	appendStringInfoString(buf, " FROM STDIN ");
 }
 
@@ -2086,6 +2078,30 @@ deparseAnalyzeSizeSql(StringInfo buf, Relation rel)
  */
 void
 deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
+{
+	appendStringInfoString(buf, "SELECT ");
+	*retrieved_attrs = deparseRelColumnList(buf, rel, false);
+
+	/* Don't generate bad syntax for zero-column relation. */
+	if (list_length(*retrieved_attrs) == 0)
+		appendStringInfoString(buf, "NULL");
+
+	/*
+	 * Construct FROM clause
+	 */
+	appendStringInfoString(buf, " FROM ");
+	deparseRelation(buf, rel);
+}
+
+/*
+ * Construct the list of columns of given foreign relation in the order they
+ * appear in the tuple descriptor of the relation. Ignore any dropped columns.
+ * Use column names on the foreign server instead of local names.
+ *
+ * Optionally enclose the list in parantheses.
+ */
+static List *
+deparseRelColumnList(StringInfo buf, Relation rel, bool enclose_in_parens)
 {
 	Oid			relid = RelationGetRelid(rel);
 	TupleDesc	tupdesc = RelationGetDescr(rel);
@@ -2094,10 +2110,8 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
 	List	   *options;
 	ListCell   *lc;
 	bool		first = true;
+	List	   *retrieved_attrs = NIL;
 
-	*retrieved_attrs = NIL;
-
-	appendStringInfoString(buf, "SELECT ");
 	for (i = 0; i < tupdesc->natts; i++)
 	{
 		/* Ignore dropped columns. */
@@ -2106,6 +2120,9 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
 
 		if (!first)
 			appendStringInfoString(buf, ", ");
+		else if (enclose_in_parens)
+			appendStringInfoChar(buf, '(');
+
 		first = false;
 
 		/* Use attribute name or column_name option. */
@@ -2125,18 +2142,13 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
 
 		appendStringInfoString(buf, quote_identifier(colname));
 
-		*retrieved_attrs = lappend_int(*retrieved_attrs, i + 1);
+		retrieved_attrs = lappend_int(retrieved_attrs, i + 1);
 	}
 
-	/* Don't generate bad syntax for zero-column relation. */
-	if (first)
-		appendStringInfoString(buf, "NULL");
+	if (enclose_in_parens && list_length(retrieved_attrs) > 0)
+		appendStringInfoChar(buf, ')');
 
-	/*
-	 * Construct FROM clause
-	 */
-	appendStringInfoString(buf, " FROM ");
-	deparseRelation(buf, rel);
+	return retrieved_attrs;
 }
 
 /*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 922c08d2dc..e4201a7779 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8064,7 +8064,7 @@ copy rem2 from stdin; -- ERROR
 ERROR:  new row for relation "loc2" violates check constraint "loc2_f1positive"
 DETAIL:  Failing row contains (-1, xyzzy).
 CONTEXT:  COPY loc2, line 1: "-1	xyzzy"
-remote SQL command: COPY public.loc2 ( f1, f2 )  FROM STDIN 
+remote SQL command: COPY public.loc2(f1, f2) FROM STDIN 
 COPY rem2, line 2
 select * from rem2;
  f1 | f2  
@@ -8184,6 +8184,20 @@ drop trigger rem2_trig_row_before on rem2;
 drop trigger rem2_trig_row_after on rem2;
 drop trigger loc2_trig_row_before_insert on loc2;
 delete from rem2;
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+-- this will error because of data and column mismatch
+-- FIXME
+copy rem2 from stdin;
+select * from rem2;
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(2 rows)
+
 -- test COPY FROM with foreign table created in the same transaction
 create table loc3 (f1 int, f2 text);
 begin;
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 83971665e3..37f0c61183 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2293,6 +2293,25 @@ drop trigger loc2_trig_row_before_insert on loc2;
 
 delete from rem2;
 
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+-- this will error because of data and column mismatch
+-- FIXME
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
 -- test COPY FROM with foreign table created in the same transaction
 create table loc3 (f1 int, f2 text);
 begin;
-- 
2.17.1

Reply via email to