Attached is a WIP patch for the following:

/*
 * postgresPlanForeignModify
 *      Plan an insert/update/delete operation on a foreign table
 *
 * Note: currently, the plan tree generated for UPDATE/DELETE will always
 * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE)
 * and then the ModifyTable node will have to execute individual remote
 * UPDATE/DELETE commands.  If there are no local conditions or joins
 * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING
 * and then do nothing at ModifyTable.  Room for future optimization ...
 */

In the patch postgresPlanForeignModify has been modified so that if, in
addition to the above condition, the followings are satisfied, then the
ForeignScan and ModifyTable node will work that way.

 - There are no local BEFORE/AFTER triggers.
 - In UPDATE it's safe to evaluate expressions to assign to the target
columns on the remote server.

Here is a simple performance test.

On remote side:
postgres=# create table t (id serial primary key, inserted timestamp
default clock_timestamp(), data text);
CREATE TABLE
postgres=# insert into t(data) select random() from generate_series(0,
99999);
INSERT 0 100000
postgres=# vacuum t;
VACUUM

On local side:
postgres=# create foreign table ft (id integer, inserted timestamp, data
text) server myserver options (table_name 't');
CREATE FOREIGN TABLE

Unpatched:
postgres=# explain analyze verbose delete from ft where id < 10000;
                                                      QUERY PLAN
-----------------------------------------------------------------------------------------------------------------------
 Delete on public.ft  (cost=100.00..162.32 rows=910 width=6) (actual
time=1275.255..1275.255 rows=0 loops=1)
   Remote SQL: DELETE FROM public.t WHERE ctid = $1
   ->  Foreign Scan on public.ft  (cost=100.00..162.32 rows=910 width=6)
(actual time=1.180..52.095 rows=9999 loops=1)
         Output: ctid
         Remote SQL: SELECT ctid FROM public.t WHERE ((id < 10000)) FOR
UPDATE
 Planning time: 0.112 ms
 Execution time: 1275.733 ms
(7 rows)

Patched (Note that the DELETE command has been pushed down.):
postgres=# explain analyze verbose delete from ft where id < 10000;
                                                    QUERY PLAN
-------------------------------------------------------------------------------------------------------------------
 Delete on public.ft  (cost=100.00..162.32 rows=910 width=6) (actual
time=0.006..0.006 rows=0 loops=1)
   ->  Foreign Scan on public.ft  (cost=100.00..162.32 rows=910 width=6)
(actual time=0.001..0.001 rows=0 loops=1)
         Output: ctid
         Remote SQL: DELETE FROM public.t WHERE ((id < 10000))
 Planning time: 0.101 ms
 Execution time: 8.808 ms
(6 rows)

I'll add this to the next CF.  Comments are welcome.

Thanks,

Best regards,
Etsuro Fujita
*** a/contrib/postgres_fdw/deparse.c
--- b/contrib/postgres_fdw/deparse.c
***************
*** 189,198 **** is_foreign_expr(PlannerInfo *root,
        if (!foreign_expr_walker((Node *) expr, &glob_cxt, &loc_cxt))
                return false;
  
-       /* Expressions examined here should be boolean, ie noncollatable */
-       Assert(loc_cxt.collation == InvalidOid);
-       Assert(loc_cxt.state == FDW_COLLATE_NONE);
- 
        /*
         * An expression which includes any mutable functions can't be sent over
         * because its result is not stable.  For example, sending now() remote
--- 189,194 ----
***************
*** 928,933 **** deparseUpdateSql(StringInfo buf, PlannerInfo *root,
--- 924,982 ----
  }
  
  /*
+  * deparse remote UPDATE statement
+  *
+  * The statement text is appended to buf, and we also create an integer List
+  * of the columns being retrieved by RETURNING (if any), which is returned
+  * to *retrieved_attrs.
+  */
+ void
+ deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
+                                          Index rtindex, Relation rel,
+                                          List *remote_conds,
+                                          List *targetlist,
+                                          List *targetAttrs, List 
*returningList,
+                                          List **retrieved_attrs)
+ {
+       RelOptInfo *baserel = root->simple_rel_array[rtindex];
+       List       *params_list = NIL;
+       deparse_expr_cxt context;
+       bool            first;
+       ListCell   *lc;
+ 
+       /* Set up context struct for recursion */
+       context.root = root;
+       context.foreignrel = baserel;
+       context.buf = buf;
+       context.params_list = NULL;
+ 
+       appendStringInfoString(buf, "UPDATE ");
+       deparseRelation(buf, rel);
+       appendStringInfoString(buf, " SET ");
+ 
+       first = true;
+       foreach(lc, targetAttrs)
+       {
+               int                     attnum = lfirst_int(lc);
+               TargetEntry *tle = get_tle_by_resno(targetlist, attnum);
+ 
+               if (!first)
+                       appendStringInfoString(buf, ", ");
+               first = false;
+ 
+               deparseColumnRef(buf, rtindex, attnum, root);
+               appendStringInfo(buf, " = ");
+               deparseExpr((Expr *) tle->expr, &context);
+       }
+       if (remote_conds)
+               appendWhereClause(buf, root, baserel, remote_conds,
+                                                 true, &params_list);
+ 
+       deparseReturningList(buf, root, rtindex, rel, false,
+                                                returningList, 
retrieved_attrs);
+ }
+ 
+ /*
   * deparse remote DELETE statement
   *
   * The statement text is appended to buf, and we also create an integer List
***************
*** 950,955 **** deparseDeleteSql(StringInfo buf, PlannerInfo *root,
--- 999,1031 ----
  }
  
  /*
+  * deparse remote DELETE statement
+  *
+  * The statement text is appended to buf, and we also create an integer List
+  * of the columns being retrieved by RETURNING (if any), which is returned
+  * to *retrieved_attrs.
+  */
+ void
+ deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
+                                          Index rtindex, Relation rel,
+                                          List *remote_conds,
+                                          List *returningList,
+                                          List **retrieved_attrs)
+ {
+       RelOptInfo *baserel = root->simple_rel_array[rtindex];
+       List       *params_list = NIL;
+ 
+       appendStringInfoString(buf, "DELETE FROM ");
+       deparseRelation(buf, rel);
+       if (remote_conds)
+               appendWhereClause(buf, root, baserel, remote_conds,
+                                                 true, &params_list);
+ 
+       deparseReturningList(buf, root, rtindex, rel, false,
+                                                returningList, 
retrieved_attrs);
+ }
+ 
+ /*
   * Add a RETURNING clause, if needed, to an INSERT/UPDATE/DELETE.
   */
  static void
*** a/contrib/postgres_fdw/expected/postgres_fdw.out
--- b/contrib/postgres_fdw/expected/postgres_fdw.out
***************
*** 1124,1138 **** UPDATE ft2 SET c2 = ft2.c2 + 500, c3 = ft2.c3 || '_update9', 
c7 = DEFAULT
    FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
  EXPLAIN (verbose, costs off)
    DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
!                                        QUERY PLAN                             
          
! 
----------------------------------------------------------------------------------------
   Delete on public.ft2
     Output: c1, c4
-    Remote SQL: DELETE FROM "S 1"."T 1" WHERE ctid = $1 RETURNING "C 1", c4
     ->  Foreign Scan on public.ft2
           Output: ctid
!          Remote SQL: SELECT ctid FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5)) 
FOR UPDATE
! (6 rows)
  
  DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
    c1  |              c4              
--- 1124,1137 ----
    FROM ft1 WHERE ft1.c1 = ft2.c2 AND ft1.c1 % 10 = 9;
  EXPLAIN (verbose, costs off)
    DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
!                                          QUERY PLAN                           
              
! 
--------------------------------------------------------------------------------------------
   Delete on public.ft2
     Output: c1, c4
     ->  Foreign Scan on public.ft2
           Output: ctid
!          Remote SQL: DELETE FROM "S 1"."T 1" WHERE ((("C 1" % 10) = 5)) 
RETURNING "C 1", c4
! (5 rows)
  
  DELETE FROM ft2 WHERE c1 % 10 = 5 RETURNING c1, c4;
    c1  |              c4              
***************
*** 2227,2233 **** CONTEXT:  Remote SQL command: INSERT INTO "S 1"."T 1"("C 1", 
c2, c3, c4, c5, c6,
  UPDATE ft1 SET c2 = -c2 WHERE c1 = 1;  -- c2positive
  ERROR:  new row for relation "T 1" violates check constraint "c2positive"
  DETAIL:  Failing row contains (1, -1, 00001_trig_update, 1970-01-02 
08:00:00+00, 1970-01-02 00:00:00, 1, 1         , foo).
! CONTEXT:  Remote SQL command: UPDATE "S 1"."T 1" SET c2 = $2 WHERE ctid = $1
  -- Test savepoint/rollback behavior
  select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
   c2  | count 
--- 2226,2232 ----
  UPDATE ft1 SET c2 = -c2 WHERE c1 = 1;  -- c2positive
  ERROR:  new row for relation "T 1" violates check constraint "c2positive"
  DETAIL:  Failing row contains (1, -1, 00001_trig_update, 1970-01-02 
08:00:00+00, 1970-01-02 00:00:00, 1, 1         , foo).
! CONTEXT:  Remote SQL command: UPDATE "S 1"."T 1" SET c2 = (- c2) WHERE (("C 
1" = 1))
  -- Test savepoint/rollback behavior
  select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
   c2  | count 
***************
*** 2386,2392 **** savepoint s3;
  update ft2 set c2 = -2 where c2 = 42 and c1 = 10; -- fail on remote side
  ERROR:  new row for relation "T 1" violates check constraint "c2positive"
  DETAIL:  Failing row contains (10, -2, 00010_trig_update_trig_update, 
1970-01-11 08:00:00+00, 1970-01-11 00:00:00, 0, 0         , foo).
! CONTEXT:  Remote SQL command: UPDATE "S 1"."T 1" SET c2 = $2 WHERE ctid = $1
  rollback to savepoint s3;
  select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
   c2  | count 
--- 2385,2391 ----
  update ft2 set c2 = -2 where c2 = 42 and c1 = 10; -- fail on remote side
  ERROR:  new row for relation "T 1" violates check constraint "c2positive"
  DETAIL:  Failing row contains (10, -2, 00010_trig_update_trig_update, 
1970-01-11 08:00:00+00, 1970-01-11 00:00:00, 0, 0         , foo).
! CONTEXT:  Remote SQL command: UPDATE "S 1"."T 1" SET c2 = (-2) WHERE ((c2 = 
42)) AND (("C 1" = 10))
  rollback to savepoint s3;
  select c2, count(*) from ft2 where c2 < 500 group by 1 order by 1;
   c2  | count 
*** a/contrib/postgres_fdw/postgres_fdw.c
--- b/contrib/postgres_fdw/postgres_fdw.c
***************
*** 87,93 **** typedef struct PgFdwRelationInfo
   * planner to executor.  Currently we store:
   *
   * 1) SELECT statement text to be sent to the remote server
!  * 2) Integer list of attribute numbers retrieved by the SELECT
   *
   * These items are indexed with the enum FdwScanPrivateIndex, so an item
   * can be fetched with list_nth().  For example, to get the SELECT statement:
--- 87,98 ----
   * planner to executor.  Currently we store:
   *
   * 1) SELECT statement text to be sent to the remote server
!  * 2) List of restriction clauses that can be executed remotely
!  * 3) Integer list of attribute numbers retrieved by the SELECT
!  * 4) UPDATE/DELETE statement text to be sent to the remote server
!  * 5) Boolean flag showing if we set the command es_processed
!  * 6) Boolean flag showing if the remote query has a RETURNING clause
!  * 7) Integer list of attribute numbers retrieved by RETURNING, if any
   *
   * These items are indexed with the enum FdwScanPrivateIndex, so an item
   * can be fetched with list_nth().  For example, to get the SELECT statement:
***************
*** 97,106 **** enum FdwScanPrivateIndex
  {
        /* SQL statement to execute remotely (as a String node) */
        FdwScanPrivateSelectSql,
!       /* Integer list of attribute numbers retrieved by the SELECT */
!       FdwScanPrivateRetrievedAttrs
  };
  
  /*
   * Similarly, this enum describes what's kept in the fdw_private list for
   * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
--- 102,124 ----
  {
        /* SQL statement to execute remotely (as a String node) */
        FdwScanPrivateSelectSql,
!       /* List of restriction clauses that can be executed remotely */
!       FdwScanPrivateRemoteConds,
!       /* Integer list of attribute numbers retrieved by SELECT */
!       FdwScanPrivateRetrievedAttrsBySelect,
!       /* UPDATE/DELETE statement to execute remotely (as a String node) */
!       FdwScanPrivateUpdateSql,
!       /* set-processed flag (as an integer Value node) */
!       FdwScanPrivateSetProcessed,
!       /* has-returning flag (as an integer Value node) */
!       FdwScanPrivateHasReturning,
!       /* Integer list of attribute numbers retrieved by RETURNING */
!       FdwScanPrivateRetrievedAttrsByReturning
  };
  
+ #define MinFdwScanFdwPrivateLength                                    3
+ #define MaxFdwScanFdwPrivateLength                                    7
+ 
  /*
   * Similarly, this enum describes what's kept in the fdw_private list for
   * a ModifyTable node referencing a postgres_fdw foreign table.  We store:
***************
*** 132,139 **** typedef struct PgFdwScanState
        AttInMetadata *attinmeta;       /* attribute datatype conversion 
metadata */
  
        /* extracted fdw_private data */
!       char       *query;                      /* text of SELECT command */
        List       *retrieved_attrs;    /* list of retrieved attribute numbers 
*/
  
        /* for remote query execution */
        PGconn     *conn;                       /* connection for the scan */
--- 150,159 ----
        AttInMetadata *attinmeta;       /* attribute datatype conversion 
metadata */
  
        /* extracted fdw_private data */
!       char       *query;                      /* text of SELECT or 
UPDATE/DELETE command */
        List       *retrieved_attrs;    /* list of retrieved attribute numbers 
*/
+       bool            set_processed;  /* do we set the command es_processed? 
*/
+       bool            has_returning;  /* is there a RETURNING clause? */
  
        /* for remote query execution */
        PGconn     *conn;                       /* connection for the scan */
***************
*** 153,158 **** typedef struct PgFdwScanState
--- 173,183 ----
        int                     fetch_ct_2;             /* Min(# of fetches 
done, 2) */
        bool            eof_reached;    /* true if last fetch reached EOF */
  
+       /* for direct update */
+       bool            direct_update;  /* do we update the foreign table 
directly? */
+       PGresult   *result;                     /* result of an UPDATE/DELETE 
query */;
+       TupleTableSlot *rslot;          /* slot containing the result tuple */
+ 
        /* working memory contexts */
        MemoryContext batch_cxt;        /* context holding current batch of 
tuples */
        MemoryContext temp_cxt;         /* context for per-tuple temporary data 
*/
***************
*** 181,186 **** typedef struct PgFdwModifyState
--- 206,215 ----
        int                     p_nums;                 /* number of parameters 
to transmit */
        FmgrInfo   *p_flinfo;           /* output conversion functions for them 
*/
  
+       /* for direct update */
+       bool            direct_update;  /* do we update the foreign table 
directly? */
+       PgFdwScanState *fsstate;        /* execution state of a foreign scan */
+ 
        /* working memory context */
        MemoryContext temp_cxt;         /* context for per-tuple temporary data 
*/
  } PgFdwModifyState;
***************
*** 307,318 **** static bool ec_member_matches_foreign(PlannerInfo *root, 
RelOptInfo *rel,
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
  static void prepare_foreign_modify(PgFdwModifyState *fmstate);
  static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
                                                 ItemPointer tupleid,
                                                 TupleTableSlot *slot);
! static void store_returning_result(PgFdwModifyState *fmstate,
!                                          TupleTableSlot *slot, PGresult *res);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
                                                          HeapTuple *rows, int 
targrows,
                                                          double *totalrows,
--- 336,364 ----
  static void create_cursor(ForeignScanState *node);
  static void fetch_more_data(ForeignScanState *node);
  static void close_cursor(PGconn *conn, unsigned int cursor_number);
+ static bool check_direct_update(PlannerInfo *root,
+                                                               ModifyTable 
*plan,
+                                                               Index 
resultRelation,
+                                                               int 
subplan_index,
+                                                               Relation rel,
+                                                               List 
*targetAttrs);
+ static List *plan_direct_update(PlannerInfo *root,
+                                                               ModifyTable 
*plan,
+                                                               Index 
resultRelation,
+                                                               int 
subplan_index,
+                                                               Relation rel,
+                                                               List 
*targetAttrs);
  static void prepare_foreign_modify(PgFdwModifyState *fmstate);
  static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
                                                 ItemPointer tupleid,
                                                 TupleTableSlot *slot);
! static void store_returning_result(TupleTableSlot *slot,
!                                          PGresult *res,
!                                          int row,
!                                          Relation rel,
!                                          AttInMetadata *attinmeta,
!                                          List *retrieved_attrs,
!                                          MemoryContext temp_context);
  static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
                                                          HeapTuple *rows, int 
targrows,
                                                          double *totalrows,
***************
*** 848,854 **** postgresGetForeignPlan(PlannerInfo *root,
         * Build the fdw_private list that will be available to the executor.
         * Items in the list must match enum FdwScanPrivateIndex, above.
         */
!       fdw_private = list_make2(makeString(sql.data),
                                                         retrieved_attrs);
  
        /*
--- 894,901 ----
         * Build the fdw_private list that will be available to the executor.
         * Items in the list must match enum FdwScanPrivateIndex, above.
         */
!       fdw_private = list_make3(makeString(sql.data),
!                                                        remote_conds,
                                                         retrieved_attrs);
  
        /*
***************
*** 910,931 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
        server = GetForeignServer(table->serverid);
        user = GetUserMapping(userid, server->serverid);
  
        /*
         * Get connection to the foreign server.  Connection manager will
         * establish new connection if necessary.
         */
        fsstate->conn = GetConnection(server, user, false);
  
-       /* Assign a unique ID for my cursor */
-       fsstate->cursor_number = GetCursorNumber(fsstate->conn);
-       fsstate->cursor_exists = false;
- 
-       /* Get private info created by planner functions. */
-       fsstate->query = strVal(list_nth(fsplan->fdw_private,
-                                                                        
FdwScanPrivateSelectSql));
-       fsstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
-                                                                               
           FdwScanPrivateRetrievedAttrs);
- 
        /* Create contexts for batches of tuples and per-tuple temp workspace. 
*/
        fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
                                                                                
           "postgres_fdw tuple data",
--- 957,996 ----
        server = GetForeignServer(table->serverid);
        user = GetUserMapping(userid, server->serverid);
  
+       /* Get private info created by planner functions. */
+       if (list_length(fsplan->fdw_private) >= MaxFdwScanFdwPrivateLength)
+       {
+               Assert(list_length(fsplan->fdw_private) == 
MaxFdwScanFdwPrivateLength);
+ 
+               fsstate->query = strVal(list_nth(fsplan->fdw_private,
+                                                                               
 FdwScanPrivateUpdateSql));
+               fsstate->set_processed = intVal(list_nth(fsplan->fdw_private,
+                                                                               
                 FdwScanPrivateSetProcessed));
+               fsstate->has_returning = intVal(list_nth(fsplan->fdw_private,
+                                                                               
                 FdwScanPrivateHasReturning));
+               fsstate->retrieved_attrs = (List *) 
list_nth(fsplan->fdw_private,
+                                                                       
FdwScanPrivateRetrievedAttrsByReturning);
+ 
+               fsstate->direct_update = true;
+       }
+       else
+       {
+               Assert(list_length(fsplan->fdw_private) == 
MinFdwScanFdwPrivateLength);
+ 
+               fsstate->query = strVal(list_nth(fsplan->fdw_private,
+                                                                               
 FdwScanPrivateSelectSql));
+               fsstate->retrieved_attrs = (List *) 
list_nth(fsplan->fdw_private,
+                                                                               
FdwScanPrivateRetrievedAttrsBySelect);
+ 
+               fsstate->direct_update = false;
+       }
+ 
        /*
         * Get connection to the foreign server.  Connection manager will
         * establish new connection if necessary.
         */
        fsstate->conn = GetConnection(server, user, false);
  
        /* Create contexts for batches of tuples and per-tuple temp workspace. 
*/
        fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
                                                                                
           "postgres_fdw tuple data",
***************
*** 941,946 **** postgresBeginForeignScan(ForeignScanState *node, int eflags)
--- 1006,1037 ----
        /* Get info we'll need for input data conversion. */
        fsstate->attinmeta = 
TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
  
+       if (fsstate->direct_update)
+       {
+               /*
+                * Execute the update statement, and check for success.
+                *
+                * We don't use a PG_TRY block here, so be careful not to throw 
error
+                * without releasing the PGresult.
+                */
+               fsstate->result = PQexec(fsstate->conn, fsstate->query);
+               if (PQresultStatus(fsstate->result) !=
+                       (fsstate->has_returning ? PGRES_TUPLES_OK : 
PGRES_COMMAND_OK))
+                       pgfdw_report_error(ERROR, fsstate->result, 
fsstate->conn, true,
+                                                          fsstate->query);
+ 
+               /* Check number of rows affected. */
+               if (fsstate->has_returning)
+                       fsstate->num_tuples = PQntuples(fsstate->result);
+               else
+                       fsstate->num_tuples = 
atoi(PQcmdTuples(fsstate->result));
+               return;
+       }
+ 
+       /* Assign a unique ID for my cursor */
+       fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+       fsstate->cursor_exists = false;
+ 
        /* Prepare for output conversion of parameters used in remote query. */
        numParams = list_length(fsplan->fdw_exprs);
        fsstate->numParams = numParams;
***************
*** 990,995 **** postgresIterateForeignScan(ForeignScanState *node)
--- 1081,1129 ----
        PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
        TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
  
+       if (fsstate->direct_update)
+       {
+               MemoryContext oldcontext;
+ 
+               if (!fsstate->has_returning)
+               {
+                       EState     *estate = node->ss.ps.state;
+ 
+                       if (fsstate->set_processed)
+                               estate->es_processed += fsstate->num_tuples;
+                       return ExecClearTuple(slot);
+               }
+ 
+               /* If we didn't get any tuples, must be end of data. */
+               if (fsstate->next_tuple >= fsstate->num_tuples)
+                       return ExecClearTuple(slot);
+ 
+               /* We'll store RETURNING tuples in the batch_cxt. */
+               oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
+ 
+               /* Fetch the next tuple. */
+               store_returning_result(slot,
+                                                          fsstate->result,
+                                                          fsstate->next_tuple,
+                                                          fsstate->rel,
+                                                          fsstate->attinmeta,
+                                                          
fsstate->retrieved_attrs,
+                                                          fsstate->temp_cxt);
+ 
+               MemoryContextSwitchTo(oldcontext);
+ 
+               /* Save the result. */
+               fsstate->rslot = slot;
+               fsstate->next_tuple++;
+ 
+               /*
+                * Return slot.  Note that this is safe because that there are 
no local
+                * conditions and because that the tuple contained in the slot 
is
+                * projected safely and then ignored (see also 
plan_direct_update).
+                */
+               return slot;
+       }
+ 
        /*
         * If this is the first call after Begin or ReScan, we need to create 
the
         * cursor on the remote side.
***************
*** 1090,1095 **** postgresEndForeignScan(ForeignScanState *node)
--- 1224,1236 ----
        if (fsstate == NULL)
                return;
  
+       if (fsstate->direct_update)
+       {
+               /* Clean up */
+               if (fsstate->result)
+                       PQclear(fsstate->result);
+       }
+ 
        /* Close the cursor if open, to prevent accumulation of cursors */
        if (fsstate->cursor_exists)
                close_cursor(fsstate->conn, fsstate->cursor_number);
***************
*** 1206,1211 **** postgresPlanForeignModify(PlannerInfo *root,
--- 1347,1378 ----
        }
  
        /*
+        * For an UPDATE/DELETE command, if there are no local conditions or 
joins
+        * needed (see check_direct_update for more details), let the scan node 
do
+        * UPDATE/DELETE RETURNING and then do nothing at ModifyTable.
+        */
+       if (operation == CMD_UPDATE || operation == CMD_DELETE)
+       {
+               /* Check whether it's safe to do direct update. */
+               if (check_direct_update(root, plan,
+                                                               resultRelation,
+                                                               subplan_index,
+                                                               rel, 
targetAttrs))
+               {
+                       List       *fdw_private;
+ 
+                       /* OK, generate a plan to do direct update. */
+                       fdw_private = plan_direct_update(root, plan,
+                                                                               
         resultRelation,
+                                                                               
         subplan_index,
+                                                                               
         rel, targetAttrs);
+ 
+                       heap_close(rel, NoLock);
+                       return fdw_private;
+               }
+       }
+ 
+       /*
         * Extract the relevant RETURNING list if any.
         */
        if (plan->returningLists)
***************
*** 1284,1289 **** postgresBeginForeignModify(ModifyTableState *mtstate,
--- 1451,1489 ----
        fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
        fmstate->rel = rel;
  
+       /* Deconstruct fdw_private data. */
+       fmstate->query = strVal(list_nth(fdw_private,
+                                                                        
FdwModifyPrivateUpdateSql));
+       fmstate->target_attrs = (List *) list_nth(fdw_private,
+                                                                               
          FdwModifyPrivateTargetAttnums);
+       fmstate->has_returning = intVal(list_nth(fdw_private,
+                                                                               
         FdwModifyPrivateHasReturning));
+       fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
+                                                                               
         FdwModifyPrivateRetrievedAttrs);
+ 
+       if (fmstate->query == NULL)
+       {
+               PlanState          *node = mtstate->mt_plans[subplan_index];
+               PgFdwScanState *fsstate;
+ 
+               Assert(fmstate->target_attrs == NIL);
+               Assert(fmstate->has_returning == false);
+               Assert(fmstate->retrieved_attrs == NIL);
+ 
+               Assert(nodeTag(node) == T_ForeignScanState);
+               fsstate = ((ForeignScanState *) node)->fdw_state;
+ 
+               Assert(fsstate->direct_update);
+               fmstate->direct_update = true;
+               if (fsstate->has_returning)
+               {
+                       fmstate->has_returning = true;
+                       fmstate->fsstate = fsstate;
+               }
+               resultRelInfo->ri_FdwState = fmstate;
+               return;
+       }
+ 
        /*
         * Identify which user to do the remote access as.  This should match 
what
         * ExecCheckRTEPerms() does.
***************
*** 1300,1315 **** postgresBeginForeignModify(ModifyTableState *mtstate,
        fmstate->conn = GetConnection(server, user, true);
        fmstate->p_name = NULL;         /* prepared statement not made yet */
  
-       /* Deconstruct fdw_private data. */
-       fmstate->query = strVal(list_nth(fdw_private,
-                                                                        
FdwModifyPrivateUpdateSql));
-       fmstate->target_attrs = (List *) list_nth(fdw_private,
-                                                                               
          FdwModifyPrivateTargetAttnums);
-       fmstate->has_returning = intVal(list_nth(fdw_private,
-                                                                               
         FdwModifyPrivateHasReturning));
-       fmstate->retrieved_attrs = (List *) list_nth(fdw_private,
-                                                                               
         FdwModifyPrivateRetrievedAttrs);
- 
        /* Create context for per-tuple temp workspace. */
        fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
                                                                                
          "postgres_fdw temporary data",
--- 1500,1505 ----
***************
*** 1407,1413 **** postgresExecForeignInsert(EState *estate,
        {
                n_rows = PQntuples(res);
                if (n_rows > 0)
!                       store_returning_result(fmstate, slot, res);
        }
        else
                n_rows = atoi(PQcmdTuples(res));
--- 1597,1607 ----
        {
                n_rows = PQntuples(res);
                if (n_rows > 0)
!                       store_returning_result(slot, res, 0,
!                                                                  fmstate->rel,
!                                                                  
fmstate->attinmeta,
!                                                                  
fmstate->retrieved_attrs,
!                                                                  
fmstate->temp_cxt);
        }
        else
                n_rows = atoi(PQcmdTuples(res));
***************
*** 1438,1443 **** postgresExecForeignUpdate(EState *estate,
--- 1632,1645 ----
        PGresult   *res;
        int                     n_rows;
  
+       /* Return slot created in the ForeignScan node when doing direct 
update. */
+       if (fmstate->direct_update)
+       {
+               Assert(fmstate->has_returning);
+               Assert(fmstate->fsstate->rslot);
+               return fmstate->fsstate->rslot;
+       }
+ 
        /* Set up the prepared statement on the remote server, if we didn't yet 
*/
        if (!fmstate->p_name)
                prepare_foreign_modify(fmstate);
***************
*** 1477,1483 **** postgresExecForeignUpdate(EState *estate,
        {
                n_rows = PQntuples(res);
                if (n_rows > 0)
!                       store_returning_result(fmstate, slot, res);
        }
        else
                n_rows = atoi(PQcmdTuples(res));
--- 1679,1689 ----
        {
                n_rows = PQntuples(res);
                if (n_rows > 0)
!                       store_returning_result(slot, res, 0,
!                                                                  fmstate->rel,
!                                                                  
fmstate->attinmeta,
!                                                                  
fmstate->retrieved_attrs,
!                                                                  
fmstate->temp_cxt);
        }
        else
                n_rows = atoi(PQcmdTuples(res));
***************
*** 1508,1513 **** postgresExecForeignDelete(EState *estate,
--- 1714,1727 ----
        PGresult   *res;
        int                     n_rows;
  
+       /* Return slot created in the ForeignScan node when doing direct 
update. */
+       if (fmstate->direct_update)
+       {
+               Assert(fmstate->has_returning);
+               Assert(fmstate->fsstate->rslot);
+               return fmstate->fsstate->rslot;
+       }
+ 
        /* Set up the prepared statement on the remote server, if we didn't yet 
*/
        if (!fmstate->p_name)
                prepare_foreign_modify(fmstate);
***************
*** 1547,1553 **** postgresExecForeignDelete(EState *estate,
        {
                n_rows = PQntuples(res);
                if (n_rows > 0)
!                       store_returning_result(fmstate, slot, res);
        }
        else
                n_rows = atoi(PQcmdTuples(res));
--- 1761,1771 ----
        {
                n_rows = PQntuples(res);
                if (n_rows > 0)
!                       store_returning_result(slot, res, 0,
!                                                                  fmstate->rel,
!                                                                  
fmstate->attinmeta,
!                                                                  
fmstate->retrieved_attrs,
!                                                                  
fmstate->temp_cxt);
        }
        else
                n_rows = atoi(PQcmdTuples(res));
***************
*** 1575,1580 **** postgresEndForeignModify(EState *estate,
--- 1793,1802 ----
        if (fmstate == NULL)
                return;
  
+       /* If doing direct update, there is nothing to do */
+       if (fmstate->direct_update)
+               return;
+ 
        /* If we created a prepared statement, destroy it */
        if (fmstate->p_name)
        {
***************
*** 1653,1663 **** postgresExplainForeignScan(ForeignScanState *node, 
ExplainState *es)
  {
        List       *fdw_private;
        char       *sql;
  
        if (es->verbose)
        {
                fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
!               sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
                ExplainPropertyText("Remote SQL", sql, es);
        }
  }
--- 1875,1900 ----
  {
        List       *fdw_private;
        char       *sql;
+       bool            direct_update;
  
        if (es->verbose)
        {
                fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
!               if (list_length(fdw_private) >= MaxFdwScanFdwPrivateLength)
!               {
!                       Assert(list_length(fdw_private) == 
MaxFdwScanFdwPrivateLength);
!                       direct_update = true;
!               }
!               else
!               {
!                       Assert(list_length(fdw_private) == 
MinFdwScanFdwPrivateLength);
!                       direct_update = false;
!               }
! 
!               if (direct_update)
!                       sql = strVal(list_nth(fdw_private, 
FdwScanPrivateUpdateSql));
!               else
!                       sql = strVal(list_nth(fdw_private, 
FdwScanPrivateSelectSql));
                ExplainPropertyText("Remote SQL", sql, es);
        }
  }
***************
*** 1678,1684 **** postgresExplainForeignModify(ModifyTableState *mtstate,
                char       *sql = strVal(list_nth(fdw_private,
                                                                                
  FdwModifyPrivateUpdateSql));
  
!               ExplainPropertyText("Remote SQL", sql, es);
        }
  }
  
--- 1915,1922 ----
                char       *sql = strVal(list_nth(fdw_private,
                                                                                
  FdwModifyPrivateUpdateSql));
  
!               if (sql != NULL)
!                       ExplainPropertyText("Remote SQL", sql, es);
        }
  }
  
***************
*** 1907,1912 **** ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
--- 2145,2338 ----
  }
  
  /*
+  * Check whether it's safe to update the foreign table directly.
+  */
+ static bool
+ check_direct_update(PlannerInfo *root,
+                                       ModifyTable *plan,
+                                       Index resultRelation,
+                                       int subplan_index,
+                                       Relation rel,
+                                       List *targetAttrs)
+ {
+       RelOptInfo *baserel = root->simple_rel_array[resultRelation];
+       Plan       *subplan = (Plan *) list_nth(plan->plans, subplan_index);
+       ListCell   *lc;
+ 
+       if (rel->trigdesc &&
+               (rel->trigdesc->trig_update_after_row ||
+                rel->trigdesc->trig_update_before_row))
+               return false;
+ 
+       if (nodeTag(subplan) != T_ForeignScan || subplan->qual != NIL)
+               return false;
+ 
+       foreach(lc, targetAttrs)
+       {
+               int                     attnum = lfirst_int(lc);
+               TargetEntry *tle = get_tle_by_resno(subplan->targetlist,
+                                                                               
        attnum);
+ 
+               if (!is_foreign_expr(root, baserel, (Expr *) tle->expr))
+                       return false;
+       }
+ 
+       return true;
+ }
+ 
+ /*
+  * Generate a plan to update the foreign table directly.
+  */
+ static List *
+ plan_direct_update(PlannerInfo *root,
+                                  ModifyTable *plan,
+                                  Index resultRelation,
+                                  int subplan_index,
+                                  Relation rel,
+                                  List *targetAttrs)
+ {
+       CmdType         operation = plan->operation;
+       bool            canSetTag = plan->canSetTag;
+       Plan       *subplan = (Plan *) list_nth(plan->plans, subplan_index);
+       ForeignScan *fscan = (ForeignScan *) subplan;
+       StringInfoData sql;
+       List       *remote_conds;
+       List       *returningList = NIL;
+       List       *retrieved_attrs = NIL;
+       List       *new_tlist = NIL;
+       List       *fdw_private;
+ 
+       Assert(operation == CMD_UPDATE || operation == CMD_DELETE);
+ 
+       initStringInfo(&sql);
+ 
+       /*
+        * Extract the baserestrictinfo clauses that can be evaluated remotely.
+        */
+       remote_conds = (List *) list_nth(fscan->fdw_private,
+                                                                        
FdwScanPrivateRemoteConds);
+ 
+       /*
+        * Extract the relevant RETURNING list if any.
+        */
+       if (plan->returningLists)
+               returningList = (List *) list_nth(plan->returningLists, 
subplan_index);
+ 
+       /*
+        * Construct the SQL command string.
+        */
+       if (operation == CMD_UPDATE)
+       {
+               List       *targetlist = subplan->targetlist;
+ 
+               deparseDirectUpdateSql(&sql, root, resultRelation, rel,
+                                                          remote_conds,
+                                                          targetlist,
+                                                          targetAttrs,
+                                                          returningList,
+                                                          &retrieved_attrs);
+       }
+       else
+       {
+               Assert(operation == CMD_DELETE);
+ 
+               deparseDirectDeleteSql(&sql, root, resultRelation, rel,
+                                                          remote_conds,
+                                                          returningList,
+                                                          &retrieved_attrs);
+       }
+ 
+       /*
+        * Update the fdw_private list that will be available to the executor.
+        * Items in the list must match enum FdwScanPrivateIndex, above.
+        */
+       fscan->fdw_private = lappend(fscan->fdw_private, makeString(sql.data));
+       fscan->fdw_private = lappend(fscan->fdw_private, 
makeInteger(canSetTag));
+       fscan->fdw_private = lappend(fscan->fdw_private,
+                                                                
makeInteger((retrieved_attrs != NIL)));
+       fscan->fdw_private = lappend(fscan->fdw_private, retrieved_attrs);
+ 
+       /*
+        * Rrewrite the targetlist for an UPDATE command for safety of 
ExecProject.
+        * Note we ignore and do not reference result tuples in direct update 
case.
+        */
+       if (operation == CMD_UPDATE)
+       {
+               ListCell   *lc;
+               int                     attrno = 1;
+               int                     numattrs = 
RelationGetNumberOfAttributes(rel);
+ 
+               foreach(lc, subplan->targetlist)
+               {
+                       TargetEntry *tle = (TargetEntry *) lfirst(lc);
+ 
+                       if (tle->resjunk)
+                       {
+                               new_tlist = lappend(new_tlist, tle);
+                               continue;
+                       }
+ 
+                       if (attrno > numattrs)
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_DATATYPE_MISMATCH),
+                                                errmsg("table row type and 
query-specified row type do not match"),
+                                                errdetail("Query has too many 
columns.")));
+ 
+                       if (!list_member_int(targetAttrs, attrno))
+                               new_tlist = lappend(new_tlist, tle);
+                       else
+                       {
+                               Form_pg_attribute attr;
+                               Oid                     atttype;
+                               int32           atttypmod;
+                               Oid                     attcollation;
+                               Node       *new_expr;
+                               TargetEntry *new_tle;
+ 
+                               attr = rel->rd_att->attrs[attrno - 1];
+ 
+                               Assert(!attr->attisdropped);
+                               atttype = attr->atttypid;
+                               atttypmod = attr->atttypmod;
+                               attcollation = attr->attcollation;
+ 
+                               new_expr = (Node *) makeVar(resultRelation,
+                                                                               
        attrno,
+                                                                               
        atttype,
+                                                                               
        atttypmod,
+                                                                               
        attcollation,
+                                                                               
        0);
+ 
+                               new_tle = makeTargetEntry((Expr *) new_expr,
+                                                                               
  attrno,
+                                                                               
  pstrdup(NameStr(attr->attname)),
+                                                                               
  false);
+ 
+                               new_tlist = lappend(new_tlist, new_tle);
+                       }
+ 
+                       attrno++;
+               }
+ 
+               if (attrno != numattrs + 1)
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_DATATYPE_MISMATCH),
+                                        errmsg("table row type and 
query-specified row type do not match"),
+                                        errdetail("Query has too few 
columns.")));
+ 
+               subplan->targetlist = new_tlist;
+       }
+ 
+       /*
+        * Build the fdw_private list that will be available to the executor.
+        * Items in the list must match enum FdwModifyPrivateIndex, above.
+        */
+       fdw_private = list_make4(makeString(NULL), NIL, makeInteger(false), 
NIL);
+ 
+       return fdw_private;
+ }
+ 
+ /*
   * Create cursor for node's query with current parameter values.
   */
  static void
***************
*** 2254,2272 **** convert_prep_stmt_params(PgFdwModifyState *fmstate,
   * have PG_TRY blocks to ensure this happens.
   */
  static void
! store_returning_result(PgFdwModifyState *fmstate,
!                                          TupleTableSlot *slot, PGresult *res)
  {
        /* PGresult must be released before leaving this function. */
        PG_TRY();
        {
                HeapTuple       newtup;
  
!               newtup = make_tuple_from_result_row(res, 0,
!                                                                               
        fmstate->rel,
!                                                                               
        fmstate->attinmeta,
!                                                                               
        fmstate->retrieved_attrs,
!                                                                               
        fmstate->temp_cxt);
                /* tuple will be deleted when it is cleared from the slot */
                ExecStoreTuple(newtup, slot, InvalidBuffer, true);
        }
--- 2680,2703 ----
   * have PG_TRY blocks to ensure this happens.
   */
  static void
! store_returning_result(TupleTableSlot *slot,
!                                          PGresult *res,
!                                          int row,
!                                          Relation rel,
!                                          AttInMetadata *attinmeta,
!                                          List *retrieved_attrs,
!                                          MemoryContext temp_context)
  {
        /* PGresult must be released before leaving this function. */
        PG_TRY();
        {
                HeapTuple       newtup;
  
!               newtup = make_tuple_from_result_row(res, row,
!                                                                               
        rel,
!                                                                               
        attinmeta,
!                                                                               
        retrieved_attrs,
!                                                                               
        temp_context);
                /* tuple will be deleted when it is cleared from the slot */
                ExecStoreTuple(newtup, slot, InvalidBuffer, true);
        }
*** a/contrib/postgres_fdw/postgres_fdw.h
--- b/contrib/postgres_fdw/postgres_fdw.h
***************
*** 66,75 **** extern void deparseUpdateSql(StringInfo buf, PlannerInfo *root,
--- 66,87 ----
                                 Index rtindex, Relation rel,
                                 List *targetAttrs, List *returningList,
                                 List **retrieved_attrs);
+ extern void deparseDirectUpdateSql(StringInfo buf, PlannerInfo *root,
+                                                                  Index 
rtindex, Relation rel,
+                                                                  List 
*remote_conds,
+                                                                  List 
*targetlist,
+                                                                  List 
*targetAttrs,
+                                                                  List 
*returningList,
+                                                                  List 
**retrieved_attrs);
  extern void deparseDeleteSql(StringInfo buf, PlannerInfo *root,
                                 Index rtindex, Relation rel,
                                 List *returningList,
                                 List **retrieved_attrs);
+ extern void deparseDirectDeleteSql(StringInfo buf, PlannerInfo *root,
+                                                                  Index 
rtindex, Relation rel,
+                                                                  List 
*remote_conds,
+                                                                  List 
*returningList,
+                                                                  List 
**retrieved_attrs);
  extern void deparseAnalyzeSizeSql(StringInfo buf, Relation rel);
  extern void deparseAnalyzeSql(StringInfo buf, Relation rel,
                                  List **retrieved_attrs);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to