On 9/9/20 5:51 PM, Amit Langote wrote:
On Wed, Sep 9, 2020 at 6:42 PM Alexey Kondratov
<a.kondra...@postgrespro.ru> wrote:
On 2020-09-09 11:45, Andrey V. Lepikhov wrote:
This does not seem very convenient and will lead to errors in the
future. So, I agree with Amit.
And InitResultRelInfo() may set ri_usesMultiInsert to false by default,
since it's used only by COPY now. Then you won't need this in several
places:
+ resultRelInfo->ri_usesMultiInsert = false;
While the logic of turning multi-insert on with all the validations
required could be factored out of InitResultRelInfo() to a separate
routine.
Interesting idea. Maybe better to have a separate routine like Alexey says.
Ok. I rewrited the patch 0001 with the Alexey suggestion.
Patch 0002... required minor changes (new version see in attachment).
Also I added some optimization (see 0003 and 0004 patches). Here we
execute 'COPY .. FROM STDIN' at foreign server only once, in the
BeginForeignCopy routine. It is a proof-of-concept patches.
Also I see that error messages processing needs to be rewritten. Unlike
the INSERT operation applied to each row, here we find out copy errors
only after sending the END of copy. Currently implementations 0002 and
0004 provide uninformative error messages for some cases.
--
regards,
Andrey Lepikhov
Postgres Professional
>From 2053ac530db87ae4617aa953142c447e0b27e3a2 Mon Sep 17 00:00:00 2001
From: amitlan <amitlangot...@gmail.com>
Date: Mon, 24 Aug 2020 15:08:37 +0900
Subject: [PATCH 1/4] Move multi-insert decision logic into executor
When 0d5f05cde introduced support for using multi-insert mode when
copying into partitioned tables, it introduced single variable of
enum type CopyInsertMethod shared across all potential target
relations (partitions) that, along with some target relation
proprties, dictated whether to engage multi-insert mode for a given
target relation.
Move that decision logic into InitResultRelInfo which now sets a new
boolean field ri_usesMultiInsert of ResultRelInfo when a target
relation is first initialized. That prevents repeated computation
of the same information in some cases, especially for partitions,
and the new arrangement results in slightly more readability.
---
src/backend/commands/copy.c | 190 ++++++++++-----------------
src/backend/executor/execMain.c | 3 +
src/backend/executor/execPartition.c | 47 +++++++
src/include/executor/execPartition.h | 2 +
src/include/nodes/execnodes.h | 9 +-
5 files changed, 131 insertions(+), 120 deletions(-)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index db7d24a511..2119db4213 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -85,16 +85,6 @@ typedef enum EolType
EOL_CRNL
} EolType;
-/*
- * Represents the heap insert method to be used during COPY FROM.
- */
-typedef enum CopyInsertMethod
-{
- CIM_SINGLE, /* use table_tuple_insert or fdw routine */
- CIM_MULTI, /* always use table_multi_insert */
- CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */
-} CopyInsertMethod;
-
/*
* This struct contains all the state variables used throughout a COPY
* operation. For simplicity, we use the same struct for all variants of COPY,
@@ -2715,12 +2705,10 @@ CopyFrom(CopyState cstate)
CommandId mycid = GetCurrentCommandId(true);
int ti_options = 0; /* start with default options for insert */
BulkInsertState bistate = NULL;
- CopyInsertMethod insertMethod;
CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */
uint64 processed = 0;
bool has_before_insert_row_trig;
bool has_instead_insert_row_trig;
- bool leafpart_use_multi_insert = false;
Assert(cstate->rel);
@@ -2833,6 +2821,58 @@ CopyFrom(CopyState cstate)
0);
target_resultRelInfo = resultRelInfo;
+ Assert(target_resultRelInfo->ri_usesMultiInsert == false);
+
+ /*
+ * It's generally more efficient to prepare a bunch of tuples for
+ * insertion, and insert them in bulk, for example, with one
+ * table_multi_insert() call than call table_tuple_insert() separately
+ * for every tuple. However, there are a number of reasons why we might
+ * not be able to do this. We check some conditions below while some
+ * other target relation properties are checked in InitResultRelInfo().
+ * Partition initialization will use result of this check implicitly as
+ * the ri_usesMultiInsert value of the parent relation.
+ */
+ if (!checkMultiInsertMode(target_resultRelInfo, NULL))
+ {
+ /*
+ * Do nothing. Can't allow multi-insert mode if previous conditions
+ * checking disallow this.
+ */
+ }
+ else if (cstate->volatile_defexprs || list_length(cstate->attnumlist) == 0)
+ {
+ /*
+ * Can't support bufferization of copy into foreign tables without any
+ * defined columns or if there are any volatile default expressions in the
+ * table. Similarly to the trigger case above, such expressions may query
+ * the table we're inserting into.
+ *
+ * Note: It does not matter if any partitions have any volatile
+ * default expressions as we use the defaults from the target of the
+ * COPY command.
+ */
+ }
+ else if (contain_volatile_functions(cstate->whereClause))
+ {
+ /*
+ * Can't support multi-inserts if there are any volatile function
+ * expressions in WHERE clause. Similarly to the trigger case above,
+ * such expressions may query the table we're inserting into.
+ */
+ }
+ else
+ {
+ /*
+ * Looks okay to try multi-insert.
+ *
+ * For partitioned tables, whether or not to use multi-insert depends
+ * on the individual parition's properties which are also checked in
+ * InitResultRelInfo().
+ */
+ target_resultRelInfo->ri_usesMultiInsert = true;
+ }
+
/* Verify the named relation is a valid target for INSERT */
CheckValidResultRel(resultRelInfo, CMD_INSERT);
@@ -2854,10 +2894,14 @@ CopyFrom(CopyState cstate)
mtstate->operation = CMD_INSERT;
mtstate->resultRelInfo = estate->es_result_relations;
- if (resultRelInfo->ri_FdwRoutine != NULL &&
- resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
- resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
- resultRelInfo);
+ /*
+ * Init COPY into foreign table. Initialization of copying into foreign
+ * partitions will be done later.
+ */
+ if (target_resultRelInfo->ri_FdwRoutine != NULL &&
+ target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
+ target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
+ resultRelInfo);
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
@@ -2886,83 +2930,9 @@ CopyFrom(CopyState cstate)
cstate->qualexpr = ExecInitQual(castNode(List, cstate->whereClause),
&mtstate->ps);
- /*
- * It's generally more efficient to prepare a bunch of tuples for
- * insertion, and insert them in one table_multi_insert() call, than call
- * table_tuple_insert() separately for every tuple. However, there are a
- * number of reasons why we might not be able to do this. These are
- * explained below.
- */
- if (resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_before_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_instead_row))
- {
- /*
- * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
- * triggers on the table. Such triggers might query the table we're
- * inserting into and act differently if the tuples that have already
- * been processed and prepared for insertion are not there.
- */
- insertMethod = CIM_SINGLE;
- }
- else if (proute != NULL && resultRelInfo->ri_TrigDesc != NULL &&
- resultRelInfo->ri_TrigDesc->trig_insert_new_table)
- {
- /*
- * For partitioned tables we can't support multi-inserts when there
- * are any statement level insert triggers. It might be possible to
- * allow partitioned tables with such triggers in the future, but for
- * now, CopyMultiInsertInfoFlush expects that any before row insert
- * and statement level insert triggers are on the same relation.
- */
- insertMethod = CIM_SINGLE;
- }
- else if (resultRelInfo->ri_FdwRoutine != NULL ||
- cstate->volatile_defexprs)
- {
- /*
- * Can't support multi-inserts to foreign tables or if there are any
- * volatile default expressions in the table. Similarly to the
- * trigger case above, such expressions may query the table we're
- * inserting into.
- *
- * Note: It does not matter if any partitions have any volatile
- * default expressions as we use the defaults from the target of the
- * COPY command.
- */
- insertMethod = CIM_SINGLE;
- }
- else if (contain_volatile_functions(cstate->whereClause))
- {
- /*
- * Can't support multi-inserts if there are any volatile function
- * expressions in WHERE clause. Similarly to the trigger case above,
- * such expressions may query the table we're inserting into.
- */
- insertMethod = CIM_SINGLE;
- }
- else
- {
- /*
- * For partitioned tables, we may still be able to perform bulk
- * inserts. However, the possibility of this depends on which types
- * of triggers exist on the partition. We must disable bulk inserts
- * if the partition is a foreign table or it has any before row insert
- * or insert instead triggers (same as we checked above for the parent
- * table). Since the partition's resultRelInfos are initialized only
- * when we actually need to insert the first tuple into them, we must
- * have the intermediate insert method of CIM_MULTI_CONDITIONAL to
- * flag that we must later determine if we can use bulk-inserts for
- * the partition being inserted into.
- */
- if (proute)
- insertMethod = CIM_MULTI_CONDITIONAL;
- else
- insertMethod = CIM_MULTI;
-
+ if (resultRelInfo->ri_usesMultiInsert)
CopyMultiInsertInfoInit(&multiInsertInfo, resultRelInfo, cstate,
estate, mycid, ti_options);
- }
/*
* If not using batch mode (which allocates slots as needed) set up a
@@ -2970,7 +2940,7 @@ CopyFrom(CopyState cstate)
* one, even if we might batch insert, to read the tuple in the root
* partition's form.
*/
- if (insertMethod == CIM_SINGLE || insertMethod == CIM_MULTI_CONDITIONAL)
+ if (!resultRelInfo->ri_usesMultiInsert || proute)
{
singleslot = table_slot_create(resultRelInfo->ri_RelationDesc,
&estate->es_tupleTable);
@@ -3013,7 +2983,7 @@ CopyFrom(CopyState cstate)
ResetPerTupleExprContext(estate);
/* select slot to (initially) load row into */
- if (insertMethod == CIM_SINGLE || proute)
+ if (!target_resultRelInfo->ri_usesMultiInsert || proute)
{
myslot = singleslot;
Assert(myslot != NULL);
@@ -3021,7 +2991,6 @@ CopyFrom(CopyState cstate)
else
{
Assert(resultRelInfo == target_resultRelInfo);
- Assert(insertMethod == CIM_MULTI);
myslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
resultRelInfo);
@@ -3080,24 +3049,14 @@ CopyFrom(CopyState cstate)
has_instead_insert_row_trig = (resultRelInfo->ri_TrigDesc &&
resultRelInfo->ri_TrigDesc->trig_insert_instead_row);
- /*
- * Disable multi-inserts when the partition has BEFORE/INSTEAD
- * OF triggers, or if the partition is a foreign partition.
- */
- leafpart_use_multi_insert = insertMethod == CIM_MULTI_CONDITIONAL &&
- !has_before_insert_row_trig &&
- !has_instead_insert_row_trig &&
- resultRelInfo->ri_FdwRoutine == NULL;
-
/* Set the multi-insert buffer to use for this partition. */
- if (leafpart_use_multi_insert)
+ if (resultRelInfo->ri_usesMultiInsert)
{
if (resultRelInfo->ri_CopyMultiInsertBuffer == NULL)
CopyMultiInsertInfoSetupBuffer(&multiInsertInfo,
resultRelInfo);
}
- else if (insertMethod == CIM_MULTI_CONDITIONAL &&
- !CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+ else if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
{
/*
* Flush pending inserts if this partition can't use
@@ -3149,7 +3108,7 @@ CopyFrom(CopyState cstate)
* rowtype.
*/
map = resultRelInfo->ri_PartitionInfo->pi_RootToPartitionMap;
- if (insertMethod == CIM_SINGLE || !leafpart_use_multi_insert)
+ if (!resultRelInfo->ri_usesMultiInsert)
{
/* non batch insert */
if (map != NULL)
@@ -3168,9 +3127,6 @@ CopyFrom(CopyState cstate)
*/
TupleTableSlot *batchslot;
- /* no other path available for partitioned table */
- Assert(insertMethod == CIM_MULTI_CONDITIONAL);
-
batchslot = CopyMultiInsertInfoNextFreeSlot(&multiInsertInfo,
resultRelInfo);
@@ -3241,7 +3197,7 @@ CopyFrom(CopyState cstate)
ExecPartitionCheck(resultRelInfo, myslot, estate, true);
/* Store the slot in the multi-insert buffer, when enabled. */
- if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
+ if (resultRelInfo->ri_usesMultiInsert)
{
/*
* The slot previously might point into the per-tuple
@@ -3316,11 +3272,8 @@ CopyFrom(CopyState cstate)
}
/* Flush any remaining buffered tuples */
- if (insertMethod != CIM_SINGLE)
- {
- if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
- CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
- }
+ if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
+ CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
/* Done, clean up */
error_context_stack = errcallback.previous;
@@ -3349,11 +3302,10 @@ CopyFrom(CopyState cstate)
if (target_resultRelInfo->ri_FdwRoutine != NULL &&
target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
- target_resultRelInfo);
+ target_resultRelInfo);
/* Tear down the multi-insert buffer data */
- if (insertMethod != CIM_SINGLE)
- CopyMultiInsertInfoCleanup(&multiInsertInfo);
+ CopyMultiInsertInfoCleanup(&multiInsertInfo);
ExecCloseIndices(target_resultRelInfo);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 4fdffad6f3..12ee7f2b61 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -1345,6 +1345,9 @@ InitResultRelInfo(ResultRelInfo *resultRelInfo,
resultRelInfo->ri_PartitionRoot = partition_root;
resultRelInfo->ri_PartitionInfo = NULL; /* may be set later */
resultRelInfo->ri_CopyMultiInsertBuffer = NULL;
+
+ /* Define multi-insert mode possibility later if needed */
+ resultRelInfo->ri_usesMultiInsert = false;
}
/*
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index bd2ea25804..baaa0f61fa 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -548,6 +548,46 @@ ExecHashSubPlanResultRelsByOid(ModifyTableState *mtstate,
}
}
+bool
+checkMultiInsertMode(const ResultRelInfo *rri, const ResultRelInfo *parent)
+{
+ Assert(rri->ri_usesMultiInsert == false);
+
+ if (parent && !parent->ri_usesMultiInsert)
+ return false;
+
+ /* Check if the relation allows to use "multi-insert" mode. */
+ if (rri->ri_TrigDesc != NULL &&
+ (rri->ri_TrigDesc->trig_insert_before_row ||
+ rri->ri_TrigDesc->trig_insert_instead_row))
+ /*
+ * Can't support multi-inserts when there are any BEFORE/INSTEAD OF
+ * triggers on the table. Such triggers might query the table we're
+ * inserting into and act differently if the tuples that have already
+ * been processed and prepared for insertion are not there.
+ */
+ return false;
+
+ if (rri->ri_RelationDesc->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+ rri->ri_TrigDesc != NULL &&
+ rri->ri_TrigDesc->trig_insert_new_table)
+ /*
+ * For partitioned tables we can't support multi-inserts when there
+ * are any statement level insert triggers. It might be possible to
+ * allow partitioned tables with such triggers in the future, but for
+ * now, CopyMultiInsertInfoFlush expects that any before row insert
+ * and statement level insert triggers are on the same relation.
+ */
+ return false;
+
+ if (rri->ri_FdwRoutine != NULL)
+ /* Foreign tables don't support multi-inserts. */
+ return false;
+
+ /* OK, caller can use multi-insert on this relation. */
+ return true;
+}
+
/*
* ExecInitPartitionInfo
* Lock the partition and initialize ResultRelInfo. Also setup other
@@ -583,6 +623,13 @@ ExecInitPartitionInfo(ModifyTableState *mtstate, EState *estate,
rootrel,
estate->es_instrument);
+ /*
+ * Use multi-insert mode if the condition checking passes for the
+ * parent and its child.
+ */
+ leaf_part_rri->ri_usesMultiInsert =
+ checkMultiInsertMode(leaf_part_rri, rootResultRelInfo);
+
/*
* Verify result relation is a valid target for an INSERT. An UPDATE of a
* partition-key becomes a DELETE+INSERT operation, so this check is still
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index 6d1b722198..895bcd01c6 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -145,6 +145,8 @@ extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
PartitionTupleRouting *proute,
TupleTableSlot *slot,
EState *estate);
+extern bool checkMultiInsertMode(const ResultRelInfo *rri,
+ const ResultRelInfo *parent);
extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
PartitionTupleRouting *proute);
extern PartitionPruneState *ExecCreatePartitionPruneState(PlanState *planstate,
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 0b42dd6f94..89ae9afaa4 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -489,7 +489,14 @@ typedef struct ResultRelInfo
/* Additional information specific to partition tuple routing */
struct PartitionRoutingInfo *ri_PartitionInfo;
- /* For use by copy.c when performing multi-inserts */
+ /*
+ * The following fields are currently only relevant to copy.c.
+ *
+ * True if okay to use multi-insert on this relation
+ */
+ bool ri_usesMultiInsert;
+
+ /* Buffer allocated to this relation when using multi-insert mode */
struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
} ResultRelInfo;
--
2.25.1
>From 3db62efd9b62581cab35189e83fccd9b6b7aebfc Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Thu, 10 Sep 2020 14:21:00 +0500
Subject: [PATCH 2/4] Fast COPY FROM into the foreign or sharded table.
This feature enables bulk COPY into foreign table in the case of
multi inserts is possible and foreign table has non-zero number of columns.
FDWAPI was extended by next routines:
* BeginForeignCopy
* EndForeignCopy
* ExecForeignCopy
BeginForeignCopy and EndForeignCopy initialize and free
the CopyState of bulk COPY. The ExecForeignCopy routine send
'COPY ... FROM STDIN' command to the foreign server, in iterative
manner send tuples by CopyTo() machinery, send EOF to this connection.
Code that constructed list of columns for a given foreign relation
in the deparseAnalyzeSql() routine is separated to the deparseRelColumnList().
It is reused in the deparseCopyFromSql().
Added TAP-tests on the specific corner cases of COPY FROM STDIN operation.
By the analogy of CopyFrom() the CopyState structure was extended
with data_dest_cb callback. It is used for send text representation
of a tuple to a custom destination.
The PgFdwModifyState structure is extended with the cstate field.
It is needed for avoid repeated initialization of CopyState. ALso for this
reason CopyTo() routine was split into the set of routines CopyToStart()/
CopyTo()/CopyToFinish().
Enum CopyInsertMethod removed. This logic implements by ri_usesMultiInsert
field of the ResultRelInfo sructure.
Discussion: https://www.postgresql.org/message-id/flat/3d0909dc-3691-a576-208a-90986e55489f%40postgrespro.ru
Authors: Andrey Lepikhov, Ashutosh Bapat, Amit Langote
---
contrib/postgres_fdw/deparse.c | 60 ++++-
.../postgres_fdw/expected/postgres_fdw.out | 46 +++-
contrib/postgres_fdw/postgres_fdw.c | 143 +++++++++++
contrib/postgres_fdw/postgres_fdw.h | 1 +
contrib/postgres_fdw/sql/postgres_fdw.sql | 45 ++++
doc/src/sgml/fdwhandler.sgml | 75 ++++++
src/backend/commands/copy.c | 225 +++++++++++-------
src/backend/executor/execPartition.c | 34 ++-
src/include/commands/copy.h | 11 +
src/include/foreign/fdwapi.h | 15 ++
10 files changed, 549 insertions(+), 106 deletions(-)
diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index ad37a74221..a37981ff66 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -184,6 +184,8 @@ static void appendAggOrderBy(List *orderList, List *targetList,
static void appendFunctionName(Oid funcid, deparse_expr_cxt *context);
static Node *deparseSortGroupClause(Index ref, List *tlist, bool force_colno,
deparse_expr_cxt *context);
+static List *deparseRelColumnList(StringInfo buf, Relation rel,
+ bool enclose_in_parens);
/*
* Helper functions
@@ -1758,6 +1760,20 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
withCheckOptionList, returningList, retrieved_attrs);
}
+/*
+ * Deparse COPY FROM into given buf.
+ * We need to use list of parameters at each query.
+ */
+void
+deparseCopyFromSql(StringInfo buf, Relation rel)
+{
+ appendStringInfoString(buf, "COPY ");
+ deparseRelation(buf, rel);
+ (void) deparseRelColumnList(buf, rel, true);
+
+ appendStringInfoString(buf, " FROM STDIN ");
+}
+
/*
* deparse remote UPDATE statement
*
@@ -2061,6 +2077,30 @@ deparseAnalyzeSizeSql(StringInfo buf, Relation rel)
*/
void
deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
+{
+ appendStringInfoString(buf, "SELECT ");
+ *retrieved_attrs = deparseRelColumnList(buf, rel, false);
+
+ /* Don't generate bad syntax for zero-column relation. */
+ if (list_length(*retrieved_attrs) == 0)
+ appendStringInfoString(buf, "NULL");
+
+ /*
+ * Construct FROM clause
+ */
+ appendStringInfoString(buf, " FROM ");
+ deparseRelation(buf, rel);
+}
+
+/*
+ * Construct the list of columns of given foreign relation in the order they
+ * appear in the tuple descriptor of the relation. Ignore any dropped columns.
+ * Use column names on the foreign server instead of local names.
+ *
+ * Optionally enclose the list in parantheses.
+ */
+static List *
+deparseRelColumnList(StringInfo buf, Relation rel, bool enclose_in_parens)
{
Oid relid = RelationGetRelid(rel);
TupleDesc tupdesc = RelationGetDescr(rel);
@@ -2069,10 +2109,8 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
List *options;
ListCell *lc;
bool first = true;
+ List *retrieved_attrs = NIL;
- *retrieved_attrs = NIL;
-
- appendStringInfoString(buf, "SELECT ");
for (i = 0; i < tupdesc->natts; i++)
{
/* Ignore dropped columns. */
@@ -2081,6 +2119,9 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
if (!first)
appendStringInfoString(buf, ", ");
+ else if (enclose_in_parens)
+ appendStringInfoChar(buf, '(');
+
first = false;
/* Use attribute name or column_name option. */
@@ -2100,18 +2141,13 @@ deparseAnalyzeSql(StringInfo buf, Relation rel, List **retrieved_attrs)
appendStringInfoString(buf, quote_identifier(colname));
- *retrieved_attrs = lappend_int(*retrieved_attrs, i + 1);
+ retrieved_attrs = lappend_int(retrieved_attrs, i + 1);
}
- /* Don't generate bad syntax for zero-column relation. */
- if (first)
- appendStringInfoString(buf, "NULL");
+ if (enclose_in_parens && list_length(retrieved_attrs) > 0)
+ appendStringInfoChar(buf, ')');
- /*
- * Construct FROM clause
- */
- appendStringInfoString(buf, " FROM ");
- deparseRelation(buf, rel);
+ return retrieved_attrs;
}
/*
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 84bc0ee381..5206814f10 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8084,8 +8084,9 @@ copy rem2 from stdin;
copy rem2 from stdin; -- ERROR
ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive"
DETAIL: Failing row contains (-1, xyzzy).
-CONTEXT: remote SQL command: INSERT INTO public.loc2(f1, f2) VALUES ($1, $2)
-COPY rem2, line 1: "-1 xyzzy"
+CONTEXT: COPY loc2, line 1: "-1 xyzzy"
+remote SQL command: COPY public.loc2(f1, f2) FROM STDIN
+COPY rem2, line 2
select * from rem2;
f1 | f2
----+-----
@@ -8096,6 +8097,19 @@ select * from rem2;
alter foreign table rem2 drop constraint rem2_f1positive;
alter table loc2 drop constraint loc2_f1positive;
delete from rem2;
+create table foo (a int) partition by list (a);
+create table foo1 (like foo);
+create foreign table ffoo1 partition of foo for values in (1)
+ server loopback options (table_name 'foo1');
+create table foo2 (like foo);
+create foreign table ffoo2 partition of foo for values in (2)
+ server loopback options (table_name 'foo2');
+create function print_new_row() returns trigger language plpgsql as $$
+ begin raise notice '%', new; return new; end; $$;
+create trigger ffoo1_br_trig before insert on ffoo1
+ for each row execute function print_new_row();
+copy foo from stdin;
+NOTICE: (1)
-- Test local triggers
create trigger trig_stmt_before before insert on rem2
for each statement execute procedure trigger_func();
@@ -8204,6 +8218,34 @@ drop trigger rem2_trig_row_before on rem2;
drop trigger rem2_trig_row_after on rem2;
drop trigger loc2_trig_row_before_insert on loc2;
delete from rem2;
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+ERROR: column "f1" of relation "loc2" does not exist
+CONTEXT: remote SQL command: COPY public.loc2(f1, f2) FROM STDIN
+COPY rem2, line 3
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+ f1 | f2
+----+----
+(0 rows)
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(2 rows)
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+select * from rem2;
+--
+(4 rows)
+
-- test COPY FROM with foreign table created in the same transaction
create table loc3 (f1 int, f2 text);
begin;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index a31abce7c9..9685e731e0 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -18,6 +18,7 @@
#include "access/sysattr.h"
#include "access/table.h"
#include "catalog/pg_class.h"
+#include "commands/copy.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
@@ -190,6 +191,7 @@ typedef struct PgFdwModifyState
/* for update row movement if subplan result rel */
struct PgFdwModifyState *aux_fmstate; /* foreign-insert state, if
* created */
+ CopyState cstate; /* foreign COPY state, if used */
} PgFdwModifyState;
/*
@@ -356,6 +358,13 @@ static void postgresBeginForeignInsert(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo);
static void postgresEndForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo);
+static void postgresBeginForeignCopy(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo);
+static void postgresEndForeignCopy(EState *estate,
+ ResultRelInfo *resultRelInfo);
+static void postgresExecForeignCopy(ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots,
+ int nslots);
static int postgresIsForeignRelUpdatable(Relation rel);
static bool postgresPlanDirectModify(PlannerInfo *root,
ModifyTable *plan,
@@ -533,6 +542,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->EndForeignModify = postgresEndForeignModify;
routine->BeginForeignInsert = postgresBeginForeignInsert;
routine->EndForeignInsert = postgresEndForeignInsert;
+ routine->BeginForeignCopy = postgresBeginForeignCopy;
+ routine->EndForeignCopy = postgresEndForeignCopy;
+ routine->ExecForeignCopy = postgresExecForeignCopy;
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
routine->PlanDirectModify = postgresPlanDirectModify;
routine->BeginDirectModify = postgresBeginDirectModify;
@@ -2050,6 +2062,137 @@ postgresEndForeignInsert(EState *estate,
finish_foreign_modify(fmstate);
}
+static PgFdwModifyState *copy_fmstate = NULL;
+
+static void
+pgfdw_copy_dest_cb(void *buf, int len)
+{
+ PGconn *conn = copy_fmstate->conn;
+
+ if (PQputCopyData(conn, (char *) buf, len) <= 0)
+ {
+ PGresult *res = PQgetResult(conn);
+
+ pgfdw_report_error(ERROR, res, conn, true, copy_fmstate->query);
+ }
+}
+
+/*
+ *
+ * postgresBeginForeignCopy
+ * Begin an COPY operation on a foreign table
+ */
+static void
+postgresBeginForeignCopy(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo)
+{
+ PgFdwModifyState *fmstate;
+ StringInfoData sql;
+ RangeTblEntry *rte;
+ Relation rel = resultRelInfo->ri_RelationDesc;
+
+ rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state);
+ initStringInfo(&sql);
+ deparseCopyFromSql(&sql, rel);
+
+ fmstate = create_foreign_modify(mtstate->ps.state,
+ rte,
+ resultRelInfo,
+ CMD_INSERT,
+ NULL,
+ sql.data,
+ NIL,
+ false,
+ NIL);
+
+ fmstate->cstate = BeginCopyTo(NULL, NULL, RelationGetDescr(rel), NULL,
+ InvalidOid, NULL, false, pgfdw_copy_dest_cb,
+ NIL, NIL);
+ CopyToStart(fmstate->cstate);
+ resultRelInfo->ri_FdwState = fmstate;
+}
+
+/*
+ * postgresEndForeignCopy
+ * Finish an COPY operation on a foreign table
+ */
+static void
+postgresEndForeignCopy(EState *estate, ResultRelInfo *resultRelInfo)
+{
+ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+
+ /* Check correct use of CopyIn FDW API. */
+ Assert(fmstate->cstate != NULL);
+ CopyToFinish(fmstate->cstate);
+ pfree(fmstate->cstate);
+ fmstate->cstate = NULL;
+ finish_foreign_modify(fmstate);
+}
+
+/*
+ *
+ * postgresExecForeignCopy
+ * Send a number of tuples to the foreign relation.
+ */
+static void
+postgresExecForeignCopy(ResultRelInfo *resultRelInfo,
+ TupleTableSlot **slots, int nslots)
+{
+ PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState;
+ PGresult *res;
+ PGconn *conn = fmstate->conn;
+ bool status = false;
+ int i;
+
+ /* Check correct use of CopyIn FDW API. */
+ Assert(fmstate->cstate != NULL);
+ Assert(copy_fmstate == NULL);
+
+ res = PQexec(conn, fmstate->query);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
+ PQclear(res);
+
+ PG_TRY();
+ {
+ copy_fmstate = fmstate;
+ for (i = 0; i < nslots; i++)
+ CopyOneRowTo(fmstate->cstate, slots[i]);
+
+ status = true;
+ }
+ PG_FINALLY();
+ {
+ copy_fmstate = NULL; /* Detect problems */
+
+ /* Finish COPY IN protocol. It is needed to do after successful copy or
+ * after an error.
+ */
+ if (PQputCopyEnd(conn, status ? NULL : _("canceled by server")) <= 0 ||
+ PQflush(conn))
+ ereport(ERROR,
+ (errmsg("error returned by PQputCopyEnd: %s",
+ PQerrorMessage(conn))));
+
+ /* After successfully sending an EOF signal, check command status. */
+ res = PQgetResult(conn);
+ if ((!status && PQresultStatus(res) != PGRES_FATAL_ERROR) ||
+ (status && PQresultStatus(res) != PGRES_COMMAND_OK))
+ pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+
+ PQclear(res);
+ /* Do this to ensure we've pumped libpq back to idle state */
+ if (PQgetResult(conn) != NULL)
+ ereport(ERROR,
+ (errmsg("unexpected extra results during COPY of table: %s",
+ PQerrorMessage(conn))));
+
+ if (!status)
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+}
+
/*
* postgresIsForeignRelUpdatable
* Determine whether a foreign table supports INSERT, UPDATE and/or
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..8fc5ff018f 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -162,6 +162,7 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
List *targetAttrs, bool doNothing,
List *withCheckOptionList, List *returningList,
List **retrieved_attrs);
+extern void deparseCopyFromSql(StringInfo buf, Relation rel);
extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
List *targetAttrs,
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index d452d06343..1a56432f0f 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -2213,6 +2213,23 @@ alter table loc2 drop constraint loc2_f1positive;
delete from rem2;
+create table foo (a int) partition by list (a);
+create table foo1 (like foo);
+create foreign table ffoo1 partition of foo for values in (1)
+ server loopback options (table_name 'foo1');
+create table foo2 (like foo);
+create foreign table ffoo2 partition of foo for values in (2)
+ server loopback options (table_name 'foo2');
+create function print_new_row() returns trigger language plpgsql as $$
+ begin raise notice '%', new; return new; end; $$;
+create trigger ffoo1_br_trig before insert on ffoo1
+ for each row execute function print_new_row();
+
+copy foo from stdin;
+1
+2
+\.
+
-- Test local triggers
create trigger trig_stmt_before before insert on rem2
for each statement execute procedure trigger_func();
@@ -2313,6 +2330,34 @@ drop trigger loc2_trig_row_before_insert on loc2;
delete from rem2;
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+1 foo
+2 bar
+\.
+
+alter table loc2 add column f1 int;
+alter table loc2 add column f2 int;
+select * from rem2;
+
+-- dropped columns locally and on the foreign server
+alter table rem2 drop column f1;
+alter table rem2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
+alter table loc2 drop column f1;
+alter table loc2 drop column f2;
+copy rem2 from stdin;
+
+
+\.
+select * from rem2;
+
-- test COPY FROM with foreign table created in the same transaction
create table loc3 (f1 int, f2 text);
begin;
diff --git a/doc/src/sgml/fdwhandler.sgml b/doc/src/sgml/fdwhandler.sgml
index 72fa127212..81728945ea 100644
--- a/doc/src/sgml/fdwhandler.sgml
+++ b/doc/src/sgml/fdwhandler.sgml
@@ -796,6 +796,81 @@ EndForeignInsert(EState *estate,
<para>
<programlisting>
+void
+BeginForeignCopy(ModifyTableState *mtstate,
+ ResultRelInfo *rinfo);
+</programlisting>
+
+ Begin executing an copy operation on a foreign table. This routine is
+ called right before the first call of <function>ExecForeignCopy</function>
+ routine for the foreign table. It should perform any initialization needed
+ prior to the actual COPY FROM operation.
+ Subsequently, <function>ExecForeignCopy</function> will be called for
+ a bulk of tuples to be copied into the foreign table.
+ </para>
+
+ <para>
+ <literal>mtstate</literal> is the overall state of the
+ <structname>ModifyTable</structname> plan node being executed; global data about
+ the plan and execution state is available via this structure.
+ <literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
+ the target foreign table. (The <structfield>ri_FdwState</structfield> field of
+ <structname>ResultRelInfo</structname> is available for the FDW to store any
+ private state it needs for this operation.)
+ </para>
+
+ <para>
+ When this is called by a <command>COPY FROM</command> command, the
+ plan-related global data in <literal>mtstate</literal> is not provided.
+ </para>
+
+ <para>
+ If the <function>BeginForeignCopy</function> pointer is set to
+ <literal>NULL</literal>, no action is taken for the initialization.
+ </para>
+
+ <para>
+<programlisting>
+void
+EndForeignCopy(EState *estate,
+ ResultRelInfo *rinfo);
+</programlisting>
+
+ End the copy operation and release resources. It is normally not important
+ to release palloc'd memory, but for example open files and connections
+ to remote servers should be cleaned up.
+ </para>
+
+ <para>
+ If the <function>EndForeignCopy</function> pointer is set to
+ <literal>NULL</literal>, no action is taken for the termination.
+ </para>
+
+ <para>
+<programlisting>
+TupleTableSlot *
+ExecForeignCopy(ResultRelInfo *rinfo,
+ TupleTableSlot **slots,
+ int nslots);
+</programlisting>
+
+ Copy a bulk of tuples into the foreign table.
+ <literal>estate</literal> is global execution state for the query.
+ <literal>rinfo</literal> is the <structname>ResultRelInfo</structname> struct describing
+ the target foreign table.
+ <literal>slots</literal> contains the tuples to be inserted; it will match the
+ row-type definition of the foreign table.
+ <literal>nslots</literal> cis a number of tuples in the <literal>slots</literal>
+ </para>
+
+ <para>
+ If the <function>ExecForeignCopy</function> pointer is set to
+ <literal>NULL</literal>, attempts to insert into the foreign table will fail
+ with an error message.
+ </para>
+
+ <para>
+<programlisting>
int
IsForeignRelUpdatable(Relation rel);
</programlisting>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 2119db4213..02a034fb37 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -118,11 +118,14 @@ typedef struct CopyStateData
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
+ TupleDesc tupDesc; /* COPY TO will be used for manual tuple copying
+ * into the destination */
QueryDesc *queryDesc; /* executable query to copy from */
List *attnumlist; /* integer list of attnums to copy */
char *filename; /* filename, or NULL for STDIN/STDOUT */
bool is_program; /* is 'filename' a program to popen? */
copy_data_source_cb data_source_cb; /* function for reading data */
+ copy_data_dest_cb data_dest_cb; /* function for writing data */
bool binary; /* binary format? */
bool freeze; /* freeze rows on loading? */
bool csv_mode; /* Comma Separated Value format? */
@@ -349,17 +352,12 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* non-export function prototypes */
static CopyState BeginCopy(ParseState *pstate, bool is_from, Relation rel,
- RawStmt *raw_query, Oid queryRelId, List *attnamelist,
- List *options);
+ TupleDesc srcTupDesc, RawStmt *raw_query,
+ Oid queryRelId, List *attnamelist, List *options);
static void EndCopy(CopyState cstate);
static void ClosePipeToProgram(CopyState cstate);
-static CopyState BeginCopyTo(ParseState *pstate, Relation rel, RawStmt *query,
- Oid queryRelId, const char *filename, bool is_program,
- List *attnamelist, List *options);
-static void EndCopyTo(CopyState cstate);
static uint64 DoCopyTo(CopyState cstate);
static uint64 CopyTo(CopyState cstate);
-static void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
static bool CopyReadLine(CopyState cstate);
static bool CopyReadLineText(CopyState cstate);
static int CopyReadAttributesText(CopyState cstate);
@@ -585,7 +583,8 @@ CopySendEndOfRow(CopyState cstate)
(void) pq_putmessage('d', fe_msgbuf->data, fe_msgbuf->len);
break;
case COPY_CALLBACK:
- Assert(false); /* Not yet supported. */
+ CopySendChar(cstate, '\n');
+ cstate->data_dest_cb(fe_msgbuf->data, fe_msgbuf->len);
break;
}
@@ -1114,8 +1113,8 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
}
else
{
- cstate = BeginCopyTo(pstate, rel, query, relid,
- stmt->filename, stmt->is_program,
+ cstate = BeginCopyTo(pstate, rel, NULL, query, relid,
+ stmt->filename, stmt->is_program, NULL,
stmt->attlist, stmt->options);
*processed = DoCopyTo(cstate); /* copy from database to file */
EndCopyTo(cstate);
@@ -1497,6 +1496,7 @@ static CopyState
BeginCopy(ParseState *pstate,
bool is_from,
Relation rel,
+ TupleDesc srcTupDesc,
RawStmt *raw_query,
Oid queryRelId,
List *attnamelist,
@@ -1532,6 +1532,11 @@ BeginCopy(ParseState *pstate,
tupDesc = RelationGetDescr(cstate->rel);
}
+ else if (srcTupDesc)
+ {
+ Assert(!raw_query && !is_from);
+ tupDesc = cstate->tupDesc = srcTupDesc;
+ }
else
{
List *rewritten;
@@ -1858,20 +1863,25 @@ EndCopy(CopyState cstate)
/*
* Setup CopyState to read tuples from a table or a query for COPY TO.
*/
-static CopyState
+CopyState
BeginCopyTo(ParseState *pstate,
Relation rel,
+ TupleDesc tupDesc,
RawStmt *query,
Oid queryRelId,
const char *filename,
bool is_program,
+ copy_data_dest_cb data_dest_cb,
List *attnamelist,
List *options)
{
CopyState cstate;
- bool pipe = (filename == NULL);
+ bool pipe = (filename == NULL) && (data_dest_cb == NULL);
MemoryContext oldcontext;
+ /* Impossible to mix CopyTo modes */
+ Assert(rel == NULL || tupDesc == NULL);
+
if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
{
if (rel->rd_rel->relkind == RELKIND_VIEW)
@@ -1910,8 +1920,9 @@ BeginCopyTo(ParseState *pstate,
RelationGetRelationName(rel))));
}
- cstate = BeginCopy(pstate, false, rel, query, queryRelId, attnamelist,
- options);
+ cstate = BeginCopy(pstate, false, rel, tupDesc, query, queryRelId,
+ attnamelist, options);
+
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
if (pipe)
@@ -1920,6 +1931,11 @@ BeginCopyTo(ParseState *pstate,
if (whereToSendOutput != DestRemote)
cstate->copy_file = stdout;
}
+ else if (data_dest_cb)
+ {
+ cstate->copy_dest = COPY_CALLBACK;
+ cstate->data_dest_cb = data_dest_cb;
+ }
else
{
cstate->filename = pstrdup(filename);
@@ -2006,7 +2022,9 @@ DoCopyTo(CopyState cstate)
if (fe_copy)
SendCopyBegin(cstate);
+ CopyToStart(cstate);
processed = CopyTo(cstate);
+ CopyToFinish(cstate);
if (fe_copy)
SendCopyEnd(cstate);
@@ -2029,7 +2047,7 @@ DoCopyTo(CopyState cstate)
/*
* Clean up storage and release resources for COPY TO.
*/
-static void
+void
EndCopyTo(CopyState cstate)
{
if (cstate->queryDesc != NULL)
@@ -2045,19 +2063,22 @@ EndCopyTo(CopyState cstate)
EndCopy(cstate);
}
-/*
- * Copy from relation or query TO file.
+/* Start COPY TO operation.
+ * Separated to the routine to prevent duplicate operations in the case of
+ * manual mode, where tuples are copied to the destination one by one, by call of
+ * the CopyOneRowTo() routine.
*/
-static uint64
-CopyTo(CopyState cstate)
+void
+CopyToStart(CopyState cstate)
{
TupleDesc tupDesc;
int num_phys_attrs;
ListCell *cur;
- uint64 processed;
if (cstate->rel)
tupDesc = RelationGetDescr(cstate->rel);
+ else if (cstate->tupDesc)
+ tupDesc = cstate->tupDesc;
else
tupDesc = cstate->queryDesc->tupDesc;
num_phys_attrs = tupDesc->natts;
@@ -2144,6 +2165,32 @@ CopyTo(CopyState cstate)
CopySendEndOfRow(cstate);
}
}
+}
+
+/*
+ * Finish COPY TO operation.
+ */
+void
+CopyToFinish(CopyState cstate)
+{
+ if (cstate->binary)
+ {
+ /* Generate trailer for a binary copy */
+ CopySendInt16(cstate, -1);
+ /* Need to flush out the trailer */
+ CopySendEndOfRow(cstate);
+ }
+
+ MemoryContextDelete(cstate->rowcontext);
+}
+
+/*
+ * Copy from relation or query TO file.
+ */
+static uint64
+CopyTo(CopyState cstate)
+{
+ uint64 processed;
if (cstate->rel)
{
@@ -2175,24 +2222,13 @@ CopyTo(CopyState cstate)
ExecutorRun(cstate->queryDesc, ForwardScanDirection, 0L, true);
processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
}
-
- if (cstate->binary)
- {
- /* Generate trailer for a binary copy */
- CopySendInt16(cstate, -1);
- /* Need to flush out the trailer */
- CopySendEndOfRow(cstate);
- }
-
- MemoryContextDelete(cstate->rowcontext);
-
return processed;
}
/*
* Emit one row during CopyTo().
*/
-static void
+void
CopyOneRowTo(CopyState cstate, TupleTableSlot *slot)
{
bool need_delim = false;
@@ -2485,53 +2521,64 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
cstate->line_buf_valid = false;
save_cur_lineno = cstate->cur_lineno;
- /*
- * table_multi_insert may leak memory, so switch to short-lived memory
- * context before calling it.
- */
- oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
- table_multi_insert(resultRelInfo->ri_RelationDesc,
- slots,
- nused,
- mycid,
- ti_options,
- buffer->bistate);
- MemoryContextSwitchTo(oldcontext);
-
- for (i = 0; i < nused; i++)
+ if (resultRelInfo->ri_RelationDesc->rd_rel->relkind == RELKIND_FOREIGN_TABLE)
+ {
+ /* Flush into foreign table or partition */
+ resultRelInfo->ri_FdwRoutine->ExecForeignCopy(resultRelInfo,
+ slots,
+ nused);
+ }
+ else
{
/*
- * If there are any indexes, update them for all the inserted tuples,
- * and run AFTER ROW INSERT triggers.
+ * table_multi_insert may leak memory, so switch to short-lived memory
+ * context before calling it.
*/
- if (resultRelInfo->ri_NumIndices > 0)
- {
- List *recheckIndexes;
-
- cstate->cur_lineno = buffer->linenos[i];
- recheckIndexes =
- ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
- NIL);
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], recheckIndexes,
- cstate->transition_capture);
- list_free(recheckIndexes);
- }
+ oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+
+ table_multi_insert(resultRelInfo->ri_RelationDesc,
+ slots,
+ nused,
+ mycid,
+ ti_options,
+ buffer->bistate);
+ MemoryContextSwitchTo(oldcontext);
- /*
- * There's no indexes, but see if we need to run AFTER ROW INSERT
- * triggers anyway.
- */
- else if (resultRelInfo->ri_TrigDesc != NULL &&
- (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
- resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ for (i = 0; i < nused; i++)
{
- cstate->cur_lineno = buffer->linenos[i];
- ExecARInsertTriggers(estate, resultRelInfo,
- slots[i], NIL, cstate->transition_capture);
- }
+ /*
+ * If there are any indexes, update them for all the inserted tuples,
+ * and run AFTER ROW INSERT triggers.
+ */
+ if (resultRelInfo->ri_NumIndices > 0)
+ {
+ List *recheckIndexes;
+
+ cstate->cur_lineno = buffer->linenos[i];
+ recheckIndexes =
+ ExecInsertIndexTuples(buffer->slots[i], estate, false, NULL,
+ NIL);
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], recheckIndexes,
+ cstate->transition_capture);
+ list_free(recheckIndexes);
+ }
- ExecClearTuple(slots[i]);
+ /*
+ * There's no indexes, but see if we need to run AFTER ROW INSERT
+ * triggers anyway.
+ */
+ else if (resultRelInfo->ri_TrigDesc != NULL &&
+ (resultRelInfo->ri_TrigDesc->trig_insert_after_row ||
+ resultRelInfo->ri_TrigDesc->trig_insert_new_table))
+ {
+ cstate->cur_lineno = buffer->linenos[i];
+ ExecARInsertTriggers(estate, resultRelInfo,
+ slots[i], NIL, cstate->transition_capture);
+ }
+
+ ExecClearTuple(slots[i]);
+ }
}
/* Mark that all slots are free */
@@ -2895,13 +2942,18 @@ CopyFrom(CopyState cstate)
mtstate->resultRelInfo = estate->es_result_relations;
/*
- * Init COPY into foreign table. Initialization of copying into foreign
- * partitions will be done later.
+ * Init COPY into foreign table.
*/
- if (target_resultRelInfo->ri_FdwRoutine != NULL &&
- target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
- target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
- resultRelInfo);
+ if (target_resultRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (target_resultRelInfo->ri_usesMultiInsert)
+ target_resultRelInfo->ri_FdwRoutine->BeginForeignCopy(mtstate,
+ resultRelInfo);
+ else if (target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
+ target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
+ resultRelInfo);
+ }
+
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
@@ -3299,10 +3351,16 @@ CopyFrom(CopyState cstate)
ExecResetTupleTable(estate->es_tupleTable, false);
/* Allow the FDW to shut down */
- if (target_resultRelInfo->ri_FdwRoutine != NULL &&
- target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
- target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
+ if (target_resultRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (target_resultRelInfo->ri_usesMultiInsert &&
+ target_resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL)
+ target_resultRelInfo->ri_FdwRoutine->EndForeignCopy(estate,
target_resultRelInfo);
+ else if (target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
+ target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
+ target_resultRelInfo);
+ }
/* Tear down the multi-insert buffer data */
CopyMultiInsertInfoCleanup(&multiInsertInfo);
@@ -3354,7 +3412,8 @@ BeginCopyFrom(ParseState *pstate,
MemoryContext oldcontext;
bool volatile_defexprs;
- cstate = BeginCopy(pstate, true, rel, NULL, InvalidOid, attnamelist, options);
+ cstate = BeginCopy(pstate, true, rel, NULL, NULL, InvalidOid, attnamelist,
+ options);
oldcontext = MemoryContextSwitchTo(cstate->copycontext);
/* Initialize state variables */
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index baaa0f61fa..581498cf6c 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -580,8 +580,12 @@ checkMultiInsertMode(const ResultRelInfo *rri, const ResultRelInfo *parent)
*/
return false;
- if (rri->ri_FdwRoutine != NULL)
- /* Foreign tables don't support multi-inserts. */
+ if (rri->ri_FdwRoutine != NULL &&
+ rri->ri_FdwRoutine->ExecForeignCopy == NULL)
+ /*
+ * Foreign tables don't support multi-inserts, unless their FDW
+ * provides the necessary COPY interface.
+ */
return false;
/* OK, caller can use multi-insert on this relation. */
@@ -1041,9 +1045,13 @@ ExecInitRoutingInfo(ModifyTableState *mtstate,
* If the partition is a foreign table, let the FDW init itself for
* routing tuples to the partition.
*/
- if (partRelInfo->ri_FdwRoutine != NULL &&
- partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
- partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
+ if (partRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (partRelInfo->ri_usesMultiInsert)
+ partRelInfo->ri_FdwRoutine->BeginForeignCopy(mtstate, partRelInfo);
+ else if (partRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
+ partRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate, partRelInfo);
+ }
partRelInfo->ri_PartitionInfo = partrouteinfo;
partRelInfo->ri_CopyMultiInsertBuffer = NULL;
@@ -1245,10 +1253,18 @@ ExecCleanupTupleRouting(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo = proute->partitions[i];
/* Allow any FDWs to shut down */
- if (resultRelInfo->ri_FdwRoutine != NULL &&
- resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
- resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state,
- resultRelInfo);
+ if (resultRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (resultRelInfo->ri_usesMultiInsert)
+ {
+ Assert(resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL);
+ resultRelInfo->ri_FdwRoutine->EndForeignCopy(mtstate->ps.state,
+ resultRelInfo);
+ }
+ else if (resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
+ resultRelInfo->ri_FdwRoutine->EndForeignInsert(mtstate->ps.state,
+ resultRelInfo);
+ }
/*
* Check if this result rel is one belonging to the node's subplans,
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833565..08309149ea 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -22,6 +22,7 @@
/* CopyStateData is private in commands/copy.c */
typedef struct CopyStateData *CopyState;
typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
+typedef void (*copy_data_dest_cb) (void *outbuf, int len);
extern void DoCopy(ParseState *state, const CopyStmt *stmt,
int stmt_location, int stmt_len,
@@ -39,6 +40,16 @@ extern void CopyFromErrorCallback(void *arg);
extern uint64 CopyFrom(CopyState cstate);
+extern CopyState BeginCopyTo(ParseState *pstate, Relation rel,
+ TupleDesc tupDesc, RawStmt *query,
+ Oid queryRelId, const char *filename, bool is_program,
+ copy_data_dest_cb data_dest_cb, List *attnamelist,
+ List *options);
+extern void EndCopyTo(CopyState cstate);
+extern void CopyOneRowTo(CopyState cstate, TupleTableSlot *slot);
+extern void CopyToStart(CopyState cstate);
+extern void CopyToFinish(CopyState cstate);
+
extern DestReceiver *CreateCopyDestReceiver(void);
#endif /* COPY_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 95556dfb15..e932bdf2f4 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -104,6 +104,16 @@ typedef void (*BeginForeignInsert_function) (ModifyTableState *mtstate,
typedef void (*EndForeignInsert_function) (EState *estate,
ResultRelInfo *rinfo);
+typedef void (*BeginForeignCopy_function) (ModifyTableState *mtstate,
+ ResultRelInfo *rinfo);
+
+typedef void (*EndForeignCopy_function) (EState *estate,
+ ResultRelInfo *rinfo);
+
+typedef void (*ExecForeignCopy_function) (ResultRelInfo *rinfo,
+ TupleTableSlot **slots,
+ int nslots);
+
typedef int (*IsForeignRelUpdatable_function) (Relation rel);
typedef bool (*PlanDirectModify_function) (PlannerInfo *root,
@@ -220,6 +230,11 @@ typedef struct FdwRoutine
IterateDirectModify_function IterateDirectModify;
EndDirectModify_function EndDirectModify;
+ /* COPY a bulk of tuples into a foreign relation */
+ BeginForeignCopy_function BeginForeignCopy;
+ EndForeignCopy_function EndForeignCopy;
+ ExecForeignCopy_function ExecForeignCopy;
+
/* Functions for SELECT FOR UPDATE/SHARE row locking */
GetForeignRowMarkType_function GetForeignRowMarkType;
RefetchForeignRow_function RefetchForeignRow;
--
2.25.1
>From 1fc2ed184dd218809dd8bf3d2399bc05eddeb9f2 Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Tue, 8 Sep 2020 14:30:03 +0500
Subject: [PATCH 3/4] Add separated connections into the postgres_fdw.
Foreign Copy and someone other may want to use FDW connection
that hasn't shared with anyone else.
---
contrib/postgres_fdw/connection.c | 26 +++++++++++++------
contrib/postgres_fdw/postgres_fdw.c | 39 ++++++++++++++++-------------
contrib/postgres_fdw/postgres_fdw.h | 3 ++-
3 files changed, 43 insertions(+), 25 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 08daf26fdf..048c641e85 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -44,7 +44,11 @@
* ourselves, so that rolling back a subtransaction will kill the right
* queries and not the wrong ones.
*/
-typedef Oid ConnCacheKey;
+typedef struct ConnCacheKey
+{
+ Oid user;
+ int cid;
+} ConnCacheKey;
typedef struct ConnCacheEntry
{
@@ -65,6 +69,7 @@ typedef struct ConnCacheEntry
* Connection cache (initialized on first use)
*/
static HTAB *ConnectionHash = NULL;
+static int SeparatedConnNum = 0;
/* for assigning cursor numbers and prepared statement numbers */
static unsigned int cursor_number = 0;
@@ -105,9 +110,9 @@ static bool UserMappingPasswordRequired(UserMapping *user);
* (not even on error), we need this flag to cue manual cleanup.
*/
PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt, bool separate)
{
- bool found;
+ bool found = false;
ConnCacheEntry *entry;
ConnCacheKey key;
@@ -141,7 +146,8 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
xact_got_connection = true;
/* Create hash key for the entry. Assume no pad bytes in key struct */
- key = user->umid;
+ key.user = user->umid;
+ key.cid = separate ? ++SeparatedConnNum : 0;
/*
* Find or create cached entry for requested connection.
@@ -870,10 +876,16 @@ pgfdw_xact_callback(XactEvent event, void *arg)
*/
if (PQstatus(entry->conn) != CONNECTION_OK ||
PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
- entry->changing_xact_state)
+ entry->changing_xact_state || entry->key.cid > 0)
{
elog(DEBUG3, "discarding connection %p", entry->conn);
disconnect_pg_server(entry);
+
+ if (entry->key.cid > 0)
+ {
+ hash_search(ConnectionHash, &entry->key, HASH_REMOVE, NULL);
+ SeparatedConnNum--;
+ }
}
}
@@ -1057,9 +1069,9 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
/* find server name to be shown in the message below */
tup = SearchSysCache1(USERMAPPINGOID,
- ObjectIdGetDatum(entry->key));
+ ObjectIdGetDatum(entry->key.user));
if (!HeapTupleIsValid(tup))
- elog(ERROR, "cache lookup failed for user mapping %u", entry->key);
+ elog(ERROR, "cache lookup failed for user mapping %u", entry->key.user);
umform = (Form_pg_user_mapping) GETSTRUCT(tup);
server = GetForeignServer(umform->umserver);
ReleaseSysCache(tup);
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 9685e731e0..8bca71e3f5 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -438,7 +438,8 @@ static PgFdwModifyState *create_foreign_modify(EState *estate,
char *query,
List *target_attrs,
bool has_returning,
- List *retrieved_attrs);
+ List *retrieved_attrs,
+ bool separate_conn);
static TupleTableSlot *execute_foreign_modify(EState *estate,
ResultRelInfo *resultRelInfo,
CmdType operation,
@@ -450,7 +451,7 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
-static void finish_foreign_modify(PgFdwModifyState *fmstate);
+static void finish_foreign_modify(PgFdwModifyState *fmstate, bool separate_conn);
static List *build_remote_returning(Index rtindex, Relation rel,
List *returningList);
static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
@@ -1445,7 +1446,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->conn = GetConnection(user, false, false);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1840,7 +1841,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
query,
target_attrs,
has_returning,
- retrieved_attrs);
+ retrieved_attrs,
+ false);
resultRelInfo->ri_FdwState = fmstate;
}
@@ -1916,7 +1918,7 @@ postgresEndForeignModify(EState *estate,
return;
/* Destroy the execution state */
- finish_foreign_modify(fmstate);
+ finish_foreign_modify(fmstate, false);
}
/*
@@ -2022,7 +2024,8 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
sql.data,
targetAttrs,
retrieved_attrs != NIL,
- retrieved_attrs);
+ retrieved_attrs,
+ false);
/*
* If the given resultRelInfo already has PgFdwModifyState set, it means
@@ -2059,7 +2062,7 @@ postgresEndForeignInsert(EState *estate,
fmstate = fmstate->aux_fmstate;
/* Destroy the execution state */
- finish_foreign_modify(fmstate);
+ finish_foreign_modify(fmstate, false);
}
static PgFdwModifyState *copy_fmstate = NULL;
@@ -2103,7 +2106,8 @@ postgresBeginForeignCopy(ModifyTableState *mtstate,
sql.data,
NIL,
false,
- NIL);
+ NIL,
+ true);
fmstate->cstate = BeginCopyTo(NULL, NULL, RelationGetDescr(rel), NULL,
InvalidOid, NULL, false, pgfdw_copy_dest_cb,
@@ -2126,7 +2130,7 @@ postgresEndForeignCopy(EState *estate, ResultRelInfo *resultRelInfo)
CopyToFinish(fmstate->cstate);
pfree(fmstate->cstate);
fmstate->cstate = NULL;
- finish_foreign_modify(fmstate);
+ finish_foreign_modify(fmstate, true);
}
/*
@@ -2514,7 +2518,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->conn = GetConnection(user, false, false);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
@@ -2888,7 +2892,7 @@ estimate_path_cost_size(PlannerInfo *root,
false, &retrieved_attrs, NULL);
/* Get the remote estimate */
- conn = GetConnection(fpinfo->user, false);
+ conn = GetConnection(fpinfo->user, false, false);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -3680,7 +3684,8 @@ create_foreign_modify(EState *estate,
char *query,
List *target_attrs,
bool has_returning,
- List *retrieved_attrs)
+ List *retrieved_attrs,
+ bool separate_conn)
{
PgFdwModifyState *fmstate;
Relation rel = resultRelInfo->ri_RelationDesc;
@@ -3708,7 +3713,7 @@ create_foreign_modify(EState *estate,
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->conn = GetConnection(user, true, separate_conn);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
@@ -4014,7 +4019,7 @@ store_returning_result(PgFdwModifyState *fmstate,
* Release resources for a foreign insert/update/delete operation
*/
static void
-finish_foreign_modify(PgFdwModifyState *fmstate)
+finish_foreign_modify(PgFdwModifyState *fmstate, bool separate_conn)
{
Assert(fmstate != NULL);
@@ -4583,7 +4588,7 @@ postgresAnalyzeForeignTable(Relation relation,
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, false);
/*
* Construct command to get page count for relation.
@@ -4669,7 +4674,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, false);
/*
* Construct cursor that retrieves whole rows from remote.
@@ -4897,7 +4902,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
- conn = GetConnection(mapping, false);
+ conn = GetConnection(mapping, false, false);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index 8fc5ff018f..95cf6487a2 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -129,7 +129,8 @@ extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
/* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+ bool separate);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
--
2.25.1
>From ff8f0686abb2e37468d6ce71968a51ada9919674 Mon Sep 17 00:00:00 2001
From: Andrey Lepikhov <a.lepik...@postgrespro.ru>
Date: Thu, 10 Sep 2020 14:37:18 +0500
Subject: [PATCH 4/4] Optimized version of the 'Fast COPY FROM' feature.
Execute remote query 'COPY .. FROM STDIN' once for each foreign
partition (table) in the BeginForeignCopy() routine.
TODO:
1. reporting on errors need to remake. Here is differences from
the way of INSERT query on each row: we can find out error event after
sending END of copy command.
2. It is necessary to examine all possible ways in which an error may
occur during the COPY FROM operation.
---
contrib/postgres_fdw/connection.c | 15 ++++
.../postgres_fdw/expected/postgres_fdw.out | 4 +-
contrib/postgres_fdw/postgres_fdw.c | 81 +++++++++----------
src/backend/commands/copy.c | 51 ++++++------
src/include/foreign/fdwapi.h | 8 +-
5 files changed, 86 insertions(+), 73 deletions(-)
diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 048c641e85..8409cea40b 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -827,6 +827,21 @@ pgfdw_xact_callback(XactEvent event, void *arg)
/* Assume we might have lost track of prepared statements */
entry->have_error = true;
+ if (entry->conn && PQstatus(entry->conn) == CONNECTION_OK)
+ {
+ /* Process special case of the unfinished COPY command */
+ res = PQgetResult(entry->conn);
+ if (PQresultStatus(res) == PGRES_COPY_IN &&
+ (PQputCopyEnd(entry->conn, _("canceled by server")) <= 0 ||
+ PQflush(entry->conn)))
+ {
+ ereport(ERROR,
+ (errmsg("error returned by PQputCopyEnd: %s",
+ PQerrorMessage(entry->conn))));
+ }
+ PQclear(res);
+ }
+
/*
* If a command has been submitted to the remote server by
* using an asynchronous execution function, the command
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 5206814f10..ef9b903b58 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -8086,7 +8086,7 @@ ERROR: new row for relation "loc2" violates check constraint "loc2_f1positive"
DETAIL: Failing row contains (-1, xyzzy).
CONTEXT: COPY loc2, line 1: "-1 xyzzy"
remote SQL command: COPY public.loc2(f1, f2) FROM STDIN
-COPY rem2, line 2
+COPY rem2, line 2: ""
select * from rem2;
f1 | f2
----+-----
@@ -8223,7 +8223,7 @@ alter table loc2 drop column f2;
copy rem2 from stdin;
ERROR: column "f1" of relation "loc2" does not exist
CONTEXT: remote SQL command: COPY public.loc2(f1, f2) FROM STDIN
-COPY rem2, line 3
+COPY rem2, line 0
alter table loc2 add column f1 int;
alter table loc2 add column f2 int;
select * from rem2;
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 8bca71e3f5..6006c359f9 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -359,12 +359,12 @@ static void postgresBeginForeignInsert(ModifyTableState *mtstate,
static void postgresEndForeignInsert(EState *estate,
ResultRelInfo *resultRelInfo);
static void postgresBeginForeignCopy(ModifyTableState *mtstate,
- ResultRelInfo *resultRelInfo);
-static void postgresEndForeignCopy(EState *estate,
ResultRelInfo *resultRelInfo);
+static void postgresEndForeignCopy(EState *estate,
+ ResultRelInfo *resultRelInfo);
static void postgresExecForeignCopy(ResultRelInfo *resultRelInfo,
- TupleTableSlot **slots,
- int nslots);
+ TupleTableSlot **slots,
+ int nslots);
static int postgresIsForeignRelUpdatable(Relation rel);
static bool postgresPlanDirectModify(PlannerInfo *root,
ModifyTable *plan,
@@ -2093,6 +2093,7 @@ postgresBeginForeignCopy(ModifyTableState *mtstate,
StringInfoData sql;
RangeTblEntry *rte;
Relation rel = resultRelInfo->ri_RelationDesc;
+ PGresult *res;
rte = exec_rt_fetch(resultRelInfo->ri_RangeTableIndex, mtstate->ps.state);
initStringInfo(&sql);
@@ -2114,6 +2115,14 @@ postgresBeginForeignCopy(ModifyTableState *mtstate,
NIL, NIL);
CopyToStart(fmstate->cstate);
resultRelInfo->ri_FdwState = fmstate;
+
+ /*
+ * Start COPY operation. We may do so because we got a separate connection.
+ */
+ res = PQexec(fmstate->conn, fmstate->query);
+ if (PQresultStatus(res) != PGRES_COPY_IN)
+ pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ PQclear(res);
}
/*
@@ -2124,6 +2133,28 @@ static void
postgresEndForeignCopy(EState *estate, ResultRelInfo *resultRelInfo)
{
PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+ PGresult *res;
+ PGconn *conn = fmstate->conn;
+
+ /* Finish COPY IN protocol. It is needed to do after successful copy or
+ * after an error.
+ */
+ if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn))
+ ereport(ERROR,
+ (errmsg("error returned by PQputCopyEnd: %s",
+ PQerrorMessage(conn))));
+
+ /* After successfully sending an EOF signal, check command status. */
+ res = PQgetResult(conn);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+
+ PQclear(res);
+ /* Do this to ensure we've pumped libpq back to idle state */
+ if (PQgetResult(conn) != NULL)
+ ereport(ERROR,
+ (errmsg("unexpected extra results during COPY of table: %s",
+ PQerrorMessage(conn))));
/* Check correct use of CopyIn FDW API. */
Assert(fmstate->cstate != NULL);
@@ -2143,58 +2174,26 @@ postgresExecForeignCopy(ResultRelInfo *resultRelInfo,
TupleTableSlot **slots, int nslots)
{
PgFdwModifyState *fmstate = resultRelInfo->ri_FdwState;
- PGresult *res;
- PGconn *conn = fmstate->conn;
- bool status = false;
int i;
- /* Check correct use of CopyIn FDW API. */
+ /* Check correct use of Copy FDW API. */
Assert(fmstate->cstate != NULL);
Assert(copy_fmstate == NULL);
- res = PQexec(conn, fmstate->query);
- if (PQresultStatus(res) != PGRES_COPY_IN)
- pgfdw_report_error(ERROR, res, conn, true, fmstate->query);
- PQclear(res);
-
PG_TRY();
{
copy_fmstate = fmstate;
for (i = 0; i < nslots; i++)
CopyOneRowTo(fmstate->cstate, slots[i]);
-
- status = true;
}
- PG_FINALLY();
+ PG_CATCH();
{
copy_fmstate = NULL; /* Detect problems */
-
- /* Finish COPY IN protocol. It is needed to do after successful copy or
- * after an error.
- */
- if (PQputCopyEnd(conn, status ? NULL : _("canceled by server")) <= 0 ||
- PQflush(conn))
- ereport(ERROR,
- (errmsg("error returned by PQputCopyEnd: %s",
- PQerrorMessage(conn))));
-
- /* After successfully sending an EOF signal, check command status. */
- res = PQgetResult(conn);
- if ((!status && PQresultStatus(res) != PGRES_FATAL_ERROR) ||
- (status && PQresultStatus(res) != PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
-
- PQclear(res);
- /* Do this to ensure we've pumped libpq back to idle state */
- if (PQgetResult(conn) != NULL)
- ereport(ERROR,
- (errmsg("unexpected extra results during COPY of table: %s",
- PQerrorMessage(conn))));
-
- if (!status)
- PG_RE_THROW();
+ PG_RE_THROW();
}
PG_END_TRY();
+
+ copy_fmstate = NULL;
}
/*
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 02a034fb37..02487c9742 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -2941,20 +2941,6 @@ CopyFrom(CopyState cstate)
mtstate->operation = CMD_INSERT;
mtstate->resultRelInfo = estate->es_result_relations;
- /*
- * Init COPY into foreign table.
- */
- if (target_resultRelInfo->ri_FdwRoutine != NULL)
- {
- if (target_resultRelInfo->ri_usesMultiInsert)
- target_resultRelInfo->ri_FdwRoutine->BeginForeignCopy(mtstate,
- resultRelInfo);
- else if (target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
- target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
- resultRelInfo);
- }
-
-
/* Prepare to catch AFTER triggers. */
AfterTriggerBeginQuery();
@@ -3021,6 +3007,19 @@ CopyFrom(CopyState cstate)
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
+ /*
+ * Init COPY into foreign table.
+ */
+ if (target_resultRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (target_resultRelInfo->ri_usesMultiInsert)
+ target_resultRelInfo->ri_FdwRoutine->BeginForeignCopy(mtstate,
+ resultRelInfo);
+ else if (target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert != NULL)
+ target_resultRelInfo->ri_FdwRoutine->BeginForeignInsert(mtstate,
+ resultRelInfo);
+ }
+
for (;;)
{
TupleTableSlot *myslot;
@@ -3327,6 +3326,18 @@ CopyFrom(CopyState cstate)
if (!CopyMultiInsertInfoIsEmpty(&multiInsertInfo))
CopyMultiInsertInfoFlush(&multiInsertInfo, NULL);
+ /* Allow the FDW to shut down */
+ if (target_resultRelInfo->ri_FdwRoutine != NULL)
+ {
+ if (target_resultRelInfo->ri_usesMultiInsert &&
+ target_resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL)
+ target_resultRelInfo->ri_FdwRoutine->EndForeignCopy(estate,
+ target_resultRelInfo);
+ else if (target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
+ target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
+ target_resultRelInfo);
+ }
+
/* Done, clean up */
error_context_stack = errcallback.previous;
@@ -3350,18 +3361,6 @@ CopyFrom(CopyState cstate)
ExecResetTupleTable(estate->es_tupleTable, false);
- /* Allow the FDW to shut down */
- if (target_resultRelInfo->ri_FdwRoutine != NULL)
- {
- if (target_resultRelInfo->ri_usesMultiInsert &&
- target_resultRelInfo->ri_FdwRoutine->EndForeignCopy != NULL)
- target_resultRelInfo->ri_FdwRoutine->EndForeignCopy(estate,
- target_resultRelInfo);
- else if (target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
- target_resultRelInfo->ri_FdwRoutine->EndForeignInsert(estate,
- target_resultRelInfo);
- }
-
/* Tear down the multi-insert buffer data */
CopyMultiInsertInfoCleanup(&multiInsertInfo);
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index e932bdf2f4..d807f872ba 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -105,14 +105,14 @@ typedef void (*EndForeignInsert_function) (EState *estate,
ResultRelInfo *rinfo);
typedef void (*BeginForeignCopy_function) (ModifyTableState *mtstate,
- ResultRelInfo *rinfo);
+ ResultRelInfo *rinfo);
typedef void (*EndForeignCopy_function) (EState *estate,
- ResultRelInfo *rinfo);
+ ResultRelInfo *rinfo);
typedef void (*ExecForeignCopy_function) (ResultRelInfo *rinfo,
- TupleTableSlot **slots,
- int nslots);
+ TupleTableSlot **slots,
+ int nslots);
typedef int (*IsForeignRelUpdatable_function) (Relation rel);
--
2.25.1