Hi,

One of the issues I'm fairly regularly reminded by users/customers is
that inserting into tables sharded using FDWs are rather slow. We do
even get it reported on pgsql-bugs from time to time [1].

Some of the slowness / overhead is expected, doe to the latency between
machines in the sharded setup. Even just 1ms latency will make it way
more expensive than a single instance.

But let's do a simple experiment, comparing a hash-partitioned regular
partitions, and one with FDW partitions in the same instance. Scripts to
run this are attached. The duration of inserting 1M rows to this table
(average of 10 runs on my laptop) looks like this:

  regular: 2872 ms
  FDW:     64454 ms

Yep, it's ~20x slower. On setup with ping latency well below 0.05ms.
Imagine how would it look on sharded setups with 0.1ms or 1ms latency,
which is probably where most single-DC clusters are :-(

Now, the primary reason why the performance degrades like this is that
while FDW has batching for SELECT queries (i.e. we read larger chunks of
data from the cursors), we don't have that for INSERTs (or other DML).
Every time you insert a row, it has to go all the way down into the
partition synchronously.

For some use cases this may be reduced by having many independent
connnections from different users, so the per-user latency is higher but
acceptable. But if you need to import larger amounts of data (say, a CSV
file for analytics, ...) this may not work.

Some time ago I wrote an ugly PoC adding batching, just to see how far
would it get us, and it seems quite promising - results for he same
INSERT benchmarks look like this:

   FDW batching: 4584 ms

So, rather nice improvement, I'd say ...

Before I spend more time hacking on this, I have a couple open questions
about the design, restrictions etc.


1) Extend the FDW API?

In the patch, the batching is simply "injected" into the existing insert
API method, i.e. ExecForeignInsert et al. I wonder if it'd be better to
extend the API with a "batched" version of the method, so that we can
easily determine whether the FDW supports batching or not - it would
require changes in the callers, though. OTOH it might be useful for
COPY, where we could do something similar to multi_insert (COPY already
benefits from this patch, but it does not use the batching built-into
COPY).


2) What about the insert results?

I'm not sure what to do about "result" status for the inserted rows. We
only really "stash" the rows into a buffer, so we don't know if it will
succeed or not. The patch simply assumes it will succeed, but that's
clearly wrong, and it may result in reporting a wrong number or rows.

The patch also disables the batching when the insert has a RETURNING
clause, because there's just a single slot (for the currently inserted
row). I suppose a "batching" method would take an array of slots.


3) What about the other DML operations (DELETE/UPDATE)?

The other DML operations could probably benefit from the batching too.
INSERT was good enough for a PoC, but having batching only for INSERT
seems somewhat asmymetric. DELETE/UPDATE seem more complicated because
of quals, but likely doable.


3) Should we do batching for COPY insteads?

While looking at multi_insert, I've realized it's mostly exactly what
the new "batching insert" API function would need to be. But it's only
really used in COPY, so I wonder if we should just abandon the idea of
batching INSERTs and do batching COPY for FDW tables.

For cases that can replace INSERT with COPY this would be enough, but
unfortunately it does nothing for DELETE/UPDATE so I'm hesitant to do
this :-(


4) Expected consistency?

I'm not entirely sure what are the consistency expectations for FDWs.
Currently the FDW nodes pointing to the same server share a connection,
so the inserted rows might be visible to other nodes. But if we only
stash the rows in a local buffer for a while, that's no longer true. So
maybe this breaks the consistency expectations?

But maybe that's OK - I'm not sure how the prepared statements/cursors
affect this. I can imagine restricting the batching only to plans where
this is not an issue (single FDW node or something), but it seems rather
fragile and undesirable.

I was thinking about adding a GUC to enable/disable the batching at some
level (global, server, table, ...) but it seems like a bad match because
the consistency expectations likely depend on a query. There should be a
GUC to set the batch size, though (it's hardcoded to 100 for now).


regards



[1] 
https://www.postgresql.org/message-id/CACnz%2BQ1q0%2B2KoJam9LyNMk8JmdC6qYHXWB895Wu2xcpoip18xQ%40mail.gmail.com

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services

Attachment: fdw.sql
Description: application/sql

Attachment: local.sql
Description: application/sql

>From df2cf502909886fbfc86f93f36b2daba03f785e4 Mon Sep 17 00:00:00 2001
From: Tomas Vondra <t...@fuzzy.cz>
Date: Sun, 28 Jun 2020 14:31:18 +0200
Subject: [PATCH] patch

---
 contrib/postgres_fdw/deparse.c      |  73 ++++++++
 contrib/postgres_fdw/postgres_fdw.c | 261 ++++++++++++++++++++++++----
 contrib/postgres_fdw/postgres_fdw.h |   5 +
 3 files changed, 305 insertions(+), 34 deletions(-)

diff --git a/contrib/postgres_fdw/deparse.c b/contrib/postgres_fdw/deparse.c
index ad37a74221..374d2f5dbb 100644
--- a/contrib/postgres_fdw/deparse.c
+++ b/contrib/postgres_fdw/deparse.c
@@ -1758,6 +1758,79 @@ deparseInsertSql(StringInfo buf, RangeTblEntry *rte,
                                                 withCheckOptionList, 
returningList, retrieved_attrs);
 }
 
+/*
+ * deparse remote batch INSERT statement
+ *
+ * The statement text is appended to buf, and we also create an integer List
+ * of the columns being retrieved by WITH CHECK OPTION or RETURNING (if any),
+ * which is returned to *retrieved_attrs.
+ */
+void
+deparseBatchInsertSql(StringInfo buf, RangeTblEntry *rte,
+                                         Index rtindex, Relation rel,
+                                         List *targetAttrs, bool doNothing,
+                                         List *withCheckOptionList, List 
*returningList,
+                                         List **retrieved_attrs, int batchSize)
+{
+       AttrNumber      pindex;
+       bool            first;
+       ListCell   *lc;
+       int                     i;
+
+       appendStringInfoString(buf, "INSERT INTO ");
+       deparseRelation(buf, rel);
+
+       if (targetAttrs)
+       {
+               appendStringInfoChar(buf, '(');
+
+               first = true;
+               foreach(lc, targetAttrs)
+               {
+                       int                     attnum = lfirst_int(lc);
+
+                       if (!first)
+                               appendStringInfoString(buf, ", ");
+                       first = false;
+
+                       deparseColumnRef(buf, rtindex, attnum, rte, false);
+               }
+
+               appendStringInfoString(buf, ") VALUES ");
+
+               pindex = 1;
+               for (i = 0; i < batchSize; i++)
+               {
+                       if (i > 0)
+                               appendStringInfoString(buf, ", ");
+
+                       appendStringInfoString(buf, "(");
+
+                       first = true;
+                       foreach(lc, targetAttrs)
+                       {
+                               if (!first)
+                                       appendStringInfoString(buf, ", ");
+                               first = false;
+
+                               appendStringInfo(buf, "$%d", pindex);
+                               pindex++;
+                       }
+
+                       appendStringInfoChar(buf, ')');
+               }
+       }
+       else
+               appendStringInfoString(buf, " DEFAULT VALUES");
+
+       if (doNothing)
+               appendStringInfoString(buf, " ON CONFLICT DO NOTHING");
+
+       deparseReturningList(buf, rte, rtindex, rel,
+                                                rel->trigdesc && 
rel->trigdesc->trig_insert_after_row,
+                                                withCheckOptionList, 
returningList, retrieved_attrs);
+}
+
 /*
  * deparse remote UPDATE statement
  *
diff --git a/contrib/postgres_fdw/postgres_fdw.c 
b/contrib/postgres_fdw/postgres_fdw.c
index 9fc53cad68..17421f6b65 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -56,6 +56,8 @@ PG_MODULE_MAGIC;
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+#define BATCH_SIZE                                     100
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -93,6 +95,8 @@ enum FdwModifyPrivateIndex
 {
        /* SQL statement to execute remotely (as a String node) */
        FdwModifyPrivateUpdateSql,
+       /* SQL statement to execute remotely (as a String node) */
+       FdwModifyPrivateUpdateBatchSql,
        /* Integer list of target attribute numbers for INSERT/UPDATE */
        FdwModifyPrivateTargetAttnums,
        /* has-returning flag (as an integer Value node) */
@@ -172,9 +176,11 @@ typedef struct PgFdwModifyState
        /* for remote query execution */
        PGconn     *conn;                       /* connection for the scan */
        char       *p_name;                     /* name of prepared statement, 
if created */
+       char       *p_name_batch;       /* name of prepared batch statement, if 
created */
 
        /* extracted fdw_private data */
        char       *query;                      /* text of INSERT/UPDATE/DELETE 
command */
+       char       *batch_query;        /* text of INSERT/UPDATE/DELETE command 
*/
        List       *target_attrs;       /* list of target attribute numbers */
        bool            has_returning;  /* is there a RETURNING clause? */
        List       *retrieved_attrs;    /* attr numbers retrieved by RETURNING 
*/
@@ -187,6 +193,12 @@ typedef struct PgFdwModifyState
        /* working memory context */
        MemoryContext temp_cxt;         /* context for per-tuple temporary data 
*/
 
+       /* batching of values */
+       MemoryContext batch_cxt;
+       int                     maxbatched;
+       int                     nbatched;
+       const char  **values;
+
        /* for update row movement if subplan result rel */
        struct PgFdwModifyState *aux_fmstate;   /* foreign-insert state, if
                                                                                
         * created */
@@ -427,6 +439,7 @@ static PgFdwModifyState *create_foreign_modify(EState 
*estate,
                                                                                
           CmdType operation,
                                                                                
           Plan *subplan,
                                                                                
           char *query,
+                                                                               
           char *batch_query,
                                                                                
           List *target_attrs,
                                                                                
           bool has_returning,
                                                                                
           List *retrieved_attrs);
@@ -435,6 +448,11 @@ static TupleTableSlot *execute_foreign_modify(EState 
*estate,
                                                                                
          CmdType operation,
                                                                                
          TupleTableSlot *slot,
                                                                                
          TupleTableSlot *planSlot);
+static TupleTableSlot *flush_foreign_modify(EState *estate,
+                                                                               
          ResultRelInfo *resultRelInfo,
+                                                                               
          CmdType operation,
+                                                                               
          TupleTableSlot *slot,
+                                                                               
          TupleTableSlot *planSlot);
 static void prepare_foreign_modify(PgFdwModifyState *fmstate);
 static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
                                                                                
         ItemPointer tupleid,
@@ -1659,13 +1677,16 @@ postgresPlanForeignModify(PlannerInfo *root,
        RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
        Relation        rel;
        StringInfoData sql;
+       StringInfoData batch_sql;
        List       *targetAttrs = NIL;
        List       *withCheckOptionList = NIL;
        List       *returningList = NIL;
        List       *retrieved_attrs = NIL;
        bool            doNothing = false;
+       List       *priv = NIL;
 
        initStringInfo(&sql);
+       initStringInfo(&batch_sql);
 
        /*
         * Core code already has some lock on each rel being planned, so we can
@@ -1752,6 +1773,11 @@ postgresPlanForeignModify(PlannerInfo *root,
                                                         targetAttrs, doNothing,
                                                         withCheckOptionList, 
returningList,
                                                         &retrieved_attrs);
+
+                       deparseBatchInsertSql(&batch_sql, rte, resultRelation, 
rel,
+                                                                 targetAttrs, 
doNothing,
+                                                                 
withCheckOptionList, returningList,
+                                                                 
&retrieved_attrs, BATCH_SIZE);
                        break;
                case CMD_UPDATE:
                        deparseUpdateSql(&sql, rte, resultRelation, rel,
@@ -1775,10 +1801,13 @@ postgresPlanForeignModify(PlannerInfo *root,
         * Build the fdw_private list that will be available to the executor.
         * Items in the list must match enum FdwModifyPrivateIndex, above.
         */
-       return list_make4(makeString(sql.data),
-                                         targetAttrs,
-                                         makeInteger((retrieved_attrs != NIL)),
-                                         retrieved_attrs);
+       priv = lappend(priv, makeString(sql.data));
+       priv = lappend(priv, makeString(batch_sql.data));
+       priv = lappend(priv, targetAttrs);
+       priv = lappend(priv, makeInteger((retrieved_attrs != NIL)));
+       priv = lappend(priv, retrieved_attrs);
+
+       return priv;
 }
 
 /*
@@ -1794,6 +1823,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
 {
        PgFdwModifyState *fmstate;
        char       *query;
+       char       *batch_query;
        List       *target_attrs;
        bool            has_returning;
        List       *retrieved_attrs;
@@ -1809,6 +1839,8 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
        /* Deconstruct fdw_private data. */
        query = strVal(list_nth(fdw_private,
                                                        
FdwModifyPrivateUpdateSql));
+       batch_query = strVal(list_nth(fdw_private,
+                                                       
FdwModifyPrivateUpdateBatchSql));
        target_attrs = (List *) list_nth(fdw_private,
                                                                         
FdwModifyPrivateTargetAttnums);
        has_returning = intVal(list_nth(fdw_private,
@@ -1827,6 +1859,7 @@ postgresBeginForeignModify(ModifyTableState *mtstate,
                                                                        
mtstate->operation,
                                                                        
mtstate->mt_plans[subplan_index]->plan,
                                                                        query,
+                                                                       
batch_query,
                                                                        
target_attrs,
                                                                        
has_returning,
                                                                        
retrieved_attrs);
@@ -1925,6 +1958,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
        TupleDesc       tupdesc = RelationGetDescr(rel);
        int                     attnum;
        StringInfoData sql;
+       StringInfoData batch_sql;
        List       *targetAttrs = NIL;
        List       *retrieved_attrs = NIL;
        bool            doNothing = false;
@@ -1946,6 +1980,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
                                                RelationGetRelationName(rel))));
 
        initStringInfo(&sql);
+       initStringInfo(&batch_sql);
 
        /* We transmit all columns that are defined in the foreign table. */
        for (attnum = 1; attnum <= tupdesc->natts; attnum++)
@@ -2002,6 +2037,12 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
                                         resultRelInfo->ri_returningList,
                                         &retrieved_attrs);
 
+       /* Construct the SQL command string. */
+       deparseBatchInsertSql(&batch_sql, rte, resultRelation, rel, 
targetAttrs, doNothing,
+                                        resultRelInfo->ri_WithCheckOptions,
+                                        resultRelInfo->ri_returningList,
+                                        &retrieved_attrs, BATCH_SIZE);
+
        /* Construct an execution state. */
        fmstate = create_foreign_modify(mtstate->ps.state,
                                                                        rte,
@@ -2009,6 +2050,7 @@ postgresBeginForeignInsert(ModifyTableState *mtstate,
                                                                        
CMD_INSERT,
                                                                        NULL,
                                                                        
sql.data,
+                                                                       
batch_sql.data,
                                                                        
targetAttrs,
                                                                        
retrieved_attrs != NIL,
                                                                        
retrieved_attrs);
@@ -2040,6 +2082,9 @@ postgresEndForeignInsert(EState *estate,
 
        Assert(fmstate != NULL);
 
+       /* Send over remaining data to insert. */
+       flush_foreign_modify(estate, resultRelInfo, CMD_INSERT, NULL, NULL);
+
        /*
         * If the fmstate has aux_fmstate set, get the aux_fmstate (see
         * postgresBeginForeignInsert())
@@ -3536,6 +3581,7 @@ create_foreign_modify(EState *estate,
                                          CmdType operation,
                                          Plan *subplan,
                                          char *query,
+                                         char *batch_query,
                                          List *target_attrs,
                                          bool has_returning,
                                          List *retrieved_attrs)
@@ -3571,15 +3617,24 @@ create_foreign_modify(EState *estate,
 
        /* Set up remote query information. */
        fmstate->query = query;
+       fmstate->batch_query = batch_query;
        fmstate->target_attrs = target_attrs;
        fmstate->has_returning = has_returning;
        fmstate->retrieved_attrs = retrieved_attrs;
 
+       fmstate->nbatched = 0;
+       fmstate->maxbatched = BATCH_SIZE * list_length(target_attrs);
+       fmstate->values = palloc(fmstate->maxbatched * sizeof(char *));
+
        /* Create context for per-tuple temp workspace. */
        fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
                                                                                
          "postgres_fdw temporary data",
                                                                                
          ALLOCSET_SMALL_SIZES);
 
+       fmstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
+                                                                               
           "postgres_fdw batch data",
+                                                                               
           ALLOCSET_DEFAULT_SIZES);
+
        /* Prepare for input conversion of RETURNING results. */
        if (fmstate->has_returning)
                fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
@@ -3646,7 +3701,9 @@ execute_foreign_modify(EState *estate,
        ItemPointer ctid = NULL;
        const char **p_values;
        PGresult   *res;
-       int                     n_rows;
+       int                     n_rows = 0;
+       MemoryContext   oldctx;
+       int                     i;
 
        /* The operation should be INSERT, UPDATE, or DELETE */
        Assert(operation == CMD_INSERT ||
@@ -3677,48 +3734,163 @@ execute_foreign_modify(EState *estate,
        /* Convert parameters needed by prepared statement to text form */
        p_values = convert_prep_stmt_params(fmstate, ctid, slot);
 
-       /*
-        * Execute the prepared statement.
-        */
-       if (!PQsendQueryPrepared(fmstate->conn,
-                                                        fmstate->p_name,
-                                                        fmstate->p_nums,
-                                                        p_values,
-                                                        NULL,
-                                                        NULL,
-                                                        0))
-               pgfdw_report_error(ERROR, NULL, fmstate->conn, false, 
fmstate->query);
+       /* copy the parameters to the batch */
+       oldctx = MemoryContextSwitchTo(fmstate->batch_cxt);
+
+       for (i = 0; i < fmstate->p_nums; i++)
+               if (p_values[i] == NULL)
+                       fmstate->values[fmstate->nbatched++] = NULL;
+               else
+                       fmstate->values[fmstate->nbatched++] = 
pstrdup(p_values[i]);
+
+       MemoryContextSwitchTo(oldctx);
+
+       Assert(fmstate->nbatched <= fmstate->maxbatched);
+
+       /* if the batch is "full" we need to flush it */
+       if (fmstate->nbatched == fmstate->maxbatched || fmstate->has_returning)
+       {
+               /*
+                * Execute the prepared statement.
+                */
+               if (fmstate->has_returning)
+               {
+                       if (!PQsendQueryPrepared(fmstate->conn,
+                                                                
fmstate->p_name,
+                                                                
fmstate->nbatched,
+                                                                
fmstate->values,
+                                                                NULL,
+                                                                NULL,
+                                                                0))
+                       pgfdw_report_error(ERROR, NULL, fmstate->conn, false, 
fmstate->query);
+               }
+               else if (!PQsendQueryPrepared(fmstate->conn,
+                                                                
fmstate->p_name_batch,
+                                                                
fmstate->nbatched,
+                                                                
fmstate->values,
+                                                                NULL,
+                                                                NULL,
+                                                                0))
+                       pgfdw_report_error(ERROR, NULL, fmstate->conn, false, 
fmstate->batch_query);
+
+               /*
+                * Get the result, and check for success.
+                *
+                * We don't use a PG_TRY block here, so be careful not to throw 
error
+                * without releasing the PGresult.
+                */
+               res = pgfdw_get_result(fmstate->conn, fmstate->query);
+               if (PQresultStatus(res) !=
+                       (fmstate->has_returning ? PGRES_TUPLES_OK : 
PGRES_COMMAND_OK))
+                       pgfdw_report_error(ERROR, res, fmstate->conn, true, 
fmstate->query);
+
+               /* Check number of rows affected, and fetch RETURNING tuple if 
any */
+               if (fmstate->has_returning)
+               {
+                       n_rows = PQntuples(res);
+                       if (n_rows > 0)
+                               store_returning_result(fmstate, slot, res);
+               }
+               else
+                       n_rows = atoi(PQcmdTuples(res));
+
+               /* And clean up */
+               PQclear(res);
+
+               MemoryContextReset(fmstate->batch_cxt);
+
+               fmstate->nbatched = 0;
+       }
+       else
+               /* XXX we don't know if the future insert succeeds */
+               n_rows = 1;
+
+       MemoryContextReset(fmstate->temp_cxt);
 
        /*
-        * Get the result, and check for success.
-        *
-        * We don't use a PG_TRY block here, so be careful not to throw error
-        * without releasing the PGresult.
+        * Return NULL if nothing was inserted/updated/deleted on the remote end
         */
-       res = pgfdw_get_result(fmstate->conn, fmstate->query);
-       if (PQresultStatus(res) !=
-               (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
-               pgfdw_report_error(ERROR, res, fmstate->conn, true, 
fmstate->query);
+       return (n_rows > 0) ? slot : NULL;
+}
 
-       /* Check number of rows affected, and fetch RETURNING tuple if any */
-       if (fmstate->has_returning)
+/*
+ * flush_foreign_modify
+ *             Perform foreign-table modification as required, and fetch 
RETURNING
+ *             result if any.  (This is the shared guts of 
postgresExecForeignInsert,
+ *             postgresExecForeignUpdate, and postgresExecForeignDelete.)
+ */
+static TupleTableSlot *
+flush_foreign_modify(EState *estate,
+                                          ResultRelInfo *resultRelInfo,
+                                          CmdType operation,
+                                          TupleTableSlot *slot,
+                                          TupleTableSlot *planSlot)
+{
+       PgFdwModifyState *fmstate = (PgFdwModifyState *) 
resultRelInfo->ri_FdwState;
+       PGresult   *res;
+       int                     n_rows = 0;
+       int                     i;
+
+       /* The operation should be INSERT, UPDATE, or DELETE */
+       Assert(operation == CMD_INSERT ||
+                  operation == CMD_UPDATE ||
+                  operation == CMD_DELETE);
+
+       /* Set up the prepared statement on the remote server, if we didn't yet 
*/
+       if (!fmstate->p_name)
+               prepare_foreign_modify(fmstate);
+
+       /* if the batch is "full" we need to flush it */
+       i = 0;
+       while (i < fmstate->nbatched)
        {
-               n_rows = PQntuples(res);
-               if (n_rows > 0)
-                       store_returning_result(fmstate, slot, res);
+               /*
+                * Execute the prepared statement.
+                */
+               if (!PQsendQueryPrepared(fmstate->conn,
+                                                                
fmstate->p_name,
+                                                                
fmstate->p_nums,
+                                                                
&fmstate->values[i],
+                                                                NULL,
+                                                                NULL,
+                                                                0))
+                       pgfdw_report_error(ERROR, NULL, fmstate->conn, false, 
fmstate->query);
+
+               /*
+                * Get the result, and check for success.
+                *
+                * We don't use a PG_TRY block here, so be careful not to throw 
error
+                * without releasing the PGresult.
+                */
+               res = pgfdw_get_result(fmstate->conn, fmstate->query);
+               if (PQresultStatus(res) !=
+                       (fmstate->has_returning ? PGRES_TUPLES_OK : 
PGRES_COMMAND_OK))
+                       pgfdw_report_error(ERROR, res, fmstate->conn, true, 
fmstate->query);
+
+               /* Check number of rows affected, and fetch RETURNING tuple if 
any */
+               if (fmstate->has_returning)
+               {
+                       n_rows = PQntuples(res);
+                       if (n_rows > 0)
+                               store_returning_result(fmstate, slot, res);
+               }
+               else
+                       n_rows = atoi(PQcmdTuples(res));
+
+               /* And clean up */
+               PQclear(res);
+
+               i += fmstate->p_nums;
        }
-       else
-               n_rows = atoi(PQcmdTuples(res));
 
-       /* And clean up */
-       PQclear(res);
+       Assert(i == fmstate->nbatched);
 
        MemoryContextReset(fmstate->temp_cxt);
 
        /*
         * Return NULL if nothing was inserted/updated/deleted on the remote end
         */
-       return (n_rows > 0) ? slot : NULL;
+       return NULL;
 }
 
 /*
@@ -3729,7 +3901,9 @@ static void
 prepare_foreign_modify(PgFdwModifyState *fmstate)
 {
        char            prep_name[NAMEDATALEN];
+       char            prep_name_batch[NAMEDATALEN];
        char       *p_name;
+       char       *p_name_batch;
        PGresult   *res;
 
        /* Construct name we'll use for the prepared statement. */
@@ -3737,6 +3911,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
                         GetPrepStmtNumber(fmstate->conn));
        p_name = pstrdup(prep_name);
 
+       /* Construct name we'll use for the batch prepared statement. */
+       snprintf(prep_name_batch, sizeof(prep_name_batch), "pgsql_fdw_prep_%u",
+                        GetPrepStmtNumber(fmstate->conn));
+       p_name_batch = pstrdup(prep_name_batch);
+
        /*
         * We intentionally do not specify parameter types here, but leave the
         * remote server to derive them by default.  This avoids possible 
problems
@@ -3762,8 +3941,22 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
                pgfdw_report_error(ERROR, res, fmstate->conn, true, 
fmstate->query);
        PQclear(res);
 
+       if (fmstate->batch_query &&
+               !PQsendPrepare(fmstate->conn,
+                                          p_name_batch,
+                                          fmstate->batch_query,
+                                          0,
+                                          NULL))
+               pgfdw_report_error(ERROR, NULL, fmstate->conn, false, 
fmstate->batch_query);
+
+       res = pgfdw_get_result(fmstate->conn, fmstate->batch_query);
+       if (PQresultStatus(res) != PGRES_COMMAND_OK)
+               pgfdw_report_error(ERROR, res, fmstate->conn, true, 
fmstate->batch_query);
+       PQclear(res);
+
        /* This action shows that the prepare has been done. */
        fmstate->p_name = p_name;
+       fmstate->p_name_batch = p_name_batch;
 }
 
 /*
diff --git a/contrib/postgres_fdw/postgres_fdw.h 
b/contrib/postgres_fdw/postgres_fdw.h
index eef410db39..7e4342cab6 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -162,6 +162,11 @@ extern void deparseInsertSql(StringInfo buf, RangeTblEntry 
*rte,
                                                         List *targetAttrs, 
bool doNothing,
                                                         List 
*withCheckOptionList, List *returningList,
                                                         List 
**retrieved_attrs);
+extern void deparseBatchInsertSql(StringInfo buf, RangeTblEntry *rte,
+                                                        Index rtindex, 
Relation rel,
+                                                        List *targetAttrs, 
bool doNothing,
+                                                        List 
*withCheckOptionList, List *returningList,
+                                                        List 
**retrieved_attrs, int batchSize);
 extern void deparseUpdateSql(StringInfo buf, RangeTblEntry *rte,
                                                         Index rtindex, 
Relation rel,
                                                         List *targetAttrs,
-- 
2.25.4

Reply via email to