diff --git a/doc/src/sgml/ddl.sgml b/doc/src/sgml/ddl.sgml
index b05a9c2..5a436a1 100644
--- a/doc/src/sgml/ddl.sgml
+++ b/doc/src/sgml/ddl.sgml
@@ -2993,6 +2993,11 @@ VALUES ('Albany', NULL, NULL, 'NY');
     foreign table partitions.
    </para>
 
+   <para>
+    Updating the partition key of a row might cause it to be moved into a
+    different partition where this row satisfies its partition constraint.
+   </para>
+
    <sect3 id="ddl-partitioning-declarative-example">
     <title>Example</title>
 
@@ -3285,9 +3290,20 @@ ALTER TABLE measurement ATTACH PARTITION measurement_y2008m02
 
      <listitem>
       <para>
-       An <command>UPDATE</> that causes a row to move from one partition to
-       another fails, because the new value of the row fails to satisfy the
-       implicit partition constraint of the original partition.
+       When an <command>UPDATE</> causes a row to move from one partition to
+       another, there is a chance that another concurrent <command>UPDATE</> or
+       <command>DELETE</> misses this row. Suppose, during the row movement,
+       the row is still visible for the concurrent session, and it is about to
+       do an <command>UPDATE</> or <command>DELETE</> operation on the same
+       row. This DML operation can silently miss this row if the row now gets
+       deleted from the partition by the first session as part of its
+       <command>UPDATE</> row movement. In such case, the concurrent
+       <command>UPDATE</>/<command>DELETE</>, being unaware of the row
+       movement, interprets that the row has just been deleted so there is
+       nothing to be done for this row. Whereas, in the usual case where the
+       table is not partitioned, or where there is no row movement, the second
+       session would have identified the newly updated row and carried
+       <command>UPDATE</>/<command>DELETE</> on this new row version.
       </para>
      </listitem>
 
diff --git a/doc/src/sgml/ref/update.sgml b/doc/src/sgml/ref/update.sgml
index 8a1619f..28cfc1a 100644
--- a/doc/src/sgml/ref/update.sgml
+++ b/doc/src/sgml/ref/update.sgml
@@ -282,10 +282,17 @@ UPDATE <replaceable class="parameter">count</replaceable>
 
   <para>
    In the case of a partitioned table, updating a row might cause it to no
-   longer satisfy the partition constraint.  Since there is no provision to
-   move the row to the partition appropriate to the new value of its
-   partitioning key, an error will occur in this case.  This can also happen
-   when updating a partition directly.
+   longer satisfy the partition constraint of the containing partition. In that
+   case, if there is some other partition in the partition tree for which this
+   row satisfies its partition constraint, then the row is moved to that
+   partition. If there isn't such a partition, an error will occur. The error
+   will also occur when updating a partition directly. Behind the scenes, the
+   row movement is actually a <command>DELETE</> and
+   <command>INSERT</> operation. However, there is a possibility that a
+   concurrent <command>UPDATE</> or <command>DELETE</> on the same row may miss
+   this row. For details see the section
+   <xref linkend="ddl-partitioning-declarative-limitations">.
+
   </para>
  </refsect1>
 
diff --git a/doc/src/sgml/trigger.sgml b/doc/src/sgml/trigger.sgml
index 950245d..72300a0 100644
--- a/doc/src/sgml/trigger.sgml
+++ b/doc/src/sgml/trigger.sgml
@@ -160,6 +160,29 @@
    </para>
 
    <para>
+    If an <command>UPDATE</command> on a partitioned table causes a row to
+    move to another partition, it will be performed as a
+    <command>DELETE</command> from the original partition followed by
+    <command>INSERT</command> into the new partition. In this case, all
+    row-level <literal>BEFORE</> <command>UPDATE</command> triggers and all
+    row-level <literal>BEFORE</> <command>DELETE</command> triggers are fired
+    on the original partition. Then all row-level <literal>BEFORE</>
+    <command>INSERT</command> triggers are fired on the destination partition.
+    The possibility of surprising outcomes should be considered when all these
+    triggers affect the row being moved. As far as <literal>AFTER ROW</>
+    triggers are concerned, <literal>AFTER</> <command>DELETE</command> and
+    <literal>AFTER</> <command>INSERT</command> triggers are applied; but
+    <literal>AFTER</> <command>UPDATE</command> triggers are not applied
+    because the <command>UPDATE</command> has been converted to a
+    <command>DELETE</command> and <command>INSERT</command>. As far as
+    statement-level triggers are concerned, none of the
+    <command>DELETE</command> or <command>INSERT</command> triggers are fired,
+    even if row movement occurs; only the <command>UPDATE</command> triggers
+    defined on the target table used in the <command>UPDATE</command> statement
+    will be fired.
+   </para>
+
+   <para>
     Trigger functions invoked by per-statement triggers should always
     return <symbol>NULL</symbol>. Trigger functions invoked by per-row
     triggers can return a table row (a value of
diff --git a/src/backend/catalog/partition.c b/src/backend/catalog/partition.c
index f8c55b1..c9f5dd6 100644
--- a/src/backend/catalog/partition.c
+++ b/src/backend/catalog/partition.c
@@ -921,7 +921,8 @@ get_qual_from_partbound(Relation rel, Relation parent,
 
 /*
  * map_partition_varattnos - maps varattno of any Vars in expr from the
- * parent attno to partition attno.
+ * attno's of 'from_rel' partition to the attno's of 'to_rel' partition.
+ * The rels can be both leaf partition or a partitioned table.
  *
  * We must allow for cases where physical attnos of a partition can be
  * different from the parent's.
@@ -931,8 +932,8 @@ get_qual_from_partbound(Relation rel, Relation parent,
  * are working on Lists, so it's less messy to do the casts internally.
  */
 List *
-map_partition_varattnos(List *expr, int target_varno,
-						Relation partrel, Relation parent)
+map_partition_varattnos(List *expr, int fromrel_varno,
+						Relation to_rel, Relation from_rel)
 {
 	AttrNumber *part_attnos;
 	bool		found_whole_row;
@@ -940,13 +941,13 @@ map_partition_varattnos(List *expr, int target_varno,
 	if (expr == NIL)
 		return NIL;
 
-	part_attnos = convert_tuples_by_name_map(RelationGetDescr(partrel),
-											 RelationGetDescr(parent),
+	part_attnos = convert_tuples_by_name_map(RelationGetDescr(to_rel),
+											 RelationGetDescr(from_rel),
 											 gettext_noop("could not convert row type"));
 	expr = (List *) map_variable_attnos((Node *) expr,
-										target_varno, 0,
+										fromrel_varno, 0,
 										part_attnos,
-										RelationGetDescr(parent)->natts,
+										RelationGetDescr(from_rel)->natts,
 										&found_whole_row);
 	/* There can never be a whole-row reference here */
 	if (found_whole_row)
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f391828..2706af2 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -168,7 +168,7 @@ typedef struct CopyStateData
 	PartitionDispatch *partition_dispatch_info;
 	int			num_dispatch;	/* Number of entries in the above array */
 	int			num_partitions; /* Number of members in the following arrays */
-	ResultRelInfo *partitions;	/* Per partition result relation */
+	ResultRelInfo **partitions;	/* Per partition result relation pointers */
 	TupleConversionMap **partition_tupconv_maps;
 	TupleTableSlot *partition_tuple_slot;
 	TransitionCaptureState *transition_capture;
@@ -1426,13 +1426,13 @@ BeginCopy(ParseState *pstate,
 		if (is_from && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
 		{
 			PartitionDispatch *partition_dispatch_info;
-			ResultRelInfo *partitions;
+			ResultRelInfo **partitions;
 			TupleConversionMap **partition_tupconv_maps;
 			TupleTableSlot *partition_tuple_slot;
 			int			num_parted,
 						num_partitions;
 
-			ExecSetupPartitionTupleRouting(rel,
+			ExecSetupPartitionTupleRouting(rel, NULL, 0,
 										   &partition_dispatch_info,
 										   &partitions,
 										   &partition_tupconv_maps,
@@ -1461,7 +1461,7 @@ BeginCopy(ParseState *pstate,
 				for (i = 0; i < cstate->num_partitions; ++i)
 				{
 					cstate->transition_tupconv_maps[i] =
-						convert_tuples_by_name(RelationGetDescr(cstate->partitions[i].ri_RelationDesc),
+						convert_tuples_by_name(RelationGetDescr(cstate->partitions[i]->ri_RelationDesc),
 											   RelationGetDescr(rel),
 											   gettext_noop("could not convert row type"));
 				}
@@ -2608,7 +2608,7 @@ CopyFrom(CopyState cstate)
 			 * to the selected partition.
 			 */
 			saved_resultRelInfo = resultRelInfo;
-			resultRelInfo = cstate->partitions + leaf_part_index;
+			resultRelInfo = cstate->partitions[leaf_part_index];
 
 			/* We do not yet have a way to insert into a foreign partition */
 			if (resultRelInfo->ri_FdwRoutine)
@@ -2717,7 +2717,7 @@ CopyFrom(CopyState cstate)
 
 				/* Check the constraints of the tuple */
 				if (cstate->rel->rd_att->constr || check_partition_constr)
-					ExecConstraints(resultRelInfo, slot, estate);
+					ExecConstraints(resultRelInfo, slot, estate, true);
 
 				if (useHeapMultiInsert)
 				{
@@ -2837,7 +2837,7 @@ CopyFrom(CopyState cstate)
 		}
 		for (i = 0; i < cstate->num_partitions; i++)
 		{
-			ResultRelInfo *resultRelInfo = cstate->partitions + i;
+			ResultRelInfo *resultRelInfo = cstate->partitions[i];
 
 			ExecCloseIndices(resultRelInfo);
 			heap_close(resultRelInfo->ri_RelationDesc, NoLock);
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 0f08283..e448d18 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -64,6 +64,18 @@
 #include "utils/snapmgr.h"
 #include "utils/tqual.h"
 
+/*
+ * Entry of a temporary hash table. During UPDATE tuple routing, we want to
+ * know which of the leaf partitions are present in the UPDATE per-subplan
+ * resultRelInfo array (ModifyTableState->resultRelInfo[]). This hash table
+ * is searchable by the oids of the subplan result rels.
+ */
+typedef struct ResultRelOidsEntry
+{
+	Oid			rel_oid;
+	ResultRelInfo *resultRelInfo;
+} ResultRelOidsEntry;
+
 
 /* Hooks for plugins to get control in ExecutorStart/Run/Finish/End */
 ExecutorStart_hook_type ExecutorStart_hook = NULL;
@@ -103,8 +115,6 @@ static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
 									 int maxfieldlen);
 static void EvalPlanQualStart(EPQState *epqstate, EState *parentestate,
 				  Plan *planTree);
-static void ExecPartitionCheck(ResultRelInfo *resultRelInfo,
-				   TupleTableSlot *slot, EState *estate);
 
 /*
  * Note that GetUpdatedColumns() also exists in commands/trigger.c.  There does
@@ -1823,15 +1833,10 @@ ExecRelCheck(ResultRelInfo *resultRelInfo,
 /*
  * ExecPartitionCheck --- check that tuple meets the partition constraint.
  */
-static void
+bool
 ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
 				   EState *estate)
 {
-	Relation	rel = resultRelInfo->ri_RelationDesc;
-	TupleDesc	tupdesc = RelationGetDescr(rel);
-	Bitmapset  *modifiedCols;
-	Bitmapset  *insertedCols;
-	Bitmapset  *updatedCols;
 	ExprContext *econtext;
 
 	/*
@@ -1859,51 +1864,65 @@ ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
 	 * As in case of the catalogued constraints, we treat a NULL result as
 	 * success here, not a failure.
 	 */
-	if (!ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext))
-	{
-		char	   *val_desc;
-		Relation	orig_rel = rel;
+	return ExecCheck(resultRelInfo->ri_PartitionCheckExpr, econtext);
+}
+
+/*
+ * ExecPartitionCheckEmitError - Form and emit an error message after a failed
+ * partition constraint check.
+ */
+void
+ExecPartitionCheckEmitError(ResultRelInfo *resultRelInfo,
+							TupleTableSlot *slot,
+							EState *estate)
+{
+	Relation	rel = resultRelInfo->ri_RelationDesc;
+	Relation	orig_rel = rel;
+	TupleDesc	tupdesc = RelationGetDescr(rel);
+	char	   *val_desc;
+	Bitmapset  *modifiedCols;
+	Bitmapset  *insertedCols;
+	Bitmapset  *updatedCols;
 
-		/* See the comment above. */
-		if (resultRelInfo->ri_PartitionRoot)
+	/* See the comments in ExecConstraints. */
+	if (resultRelInfo->ri_PartitionRoot)
+	{
+		HeapTuple	tuple = ExecFetchSlotTuple(slot);
+		TupleDesc	old_tupdesc = RelationGetDescr(rel);
+		TupleConversionMap *map;
+
+		rel = resultRelInfo->ri_PartitionRoot;
+		tupdesc = RelationGetDescr(rel);
+		/* a reverse map */
+		map = convert_tuples_by_name(old_tupdesc, tupdesc,
+									 gettext_noop("could not convert row type"));
+		if (map != NULL)
 		{
-			HeapTuple	tuple = ExecFetchSlotTuple(slot);
-			TupleDesc	old_tupdesc = RelationGetDescr(rel);
-			TupleConversionMap *map;
-
-			rel = resultRelInfo->ri_PartitionRoot;
-			tupdesc = RelationGetDescr(rel);
-			/* a reverse map */
-			map = convert_tuples_by_name(old_tupdesc, tupdesc,
-										 gettext_noop("could not convert row type"));
-			if (map != NULL)
-			{
-				tuple = do_convert_tuple(tuple, map);
-				ExecStoreTuple(tuple, slot, InvalidBuffer, false);
-			}
+			tuple = do_convert_tuple(tuple, map);
+			ExecStoreTuple(tuple, slot, InvalidBuffer, false);
 		}
-
-		insertedCols = GetInsertedColumns(resultRelInfo, estate);
-		updatedCols = GetUpdatedColumns(resultRelInfo, estate);
-		modifiedCols = bms_union(insertedCols, updatedCols);
-		val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
-												 slot,
-												 tupdesc,
-												 modifiedCols,
-												 64);
-		ereport(ERROR,
-				(errcode(ERRCODE_CHECK_VIOLATION),
-				 errmsg("new row for relation \"%s\" violates partition constraint",
-						RelationGetRelationName(orig_rel)),
-				 val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
 	}
+
+	insertedCols = GetInsertedColumns(resultRelInfo, estate);
+	updatedCols = GetUpdatedColumns(resultRelInfo, estate);
+	modifiedCols = bms_union(insertedCols, updatedCols);
+	val_desc = ExecBuildSlotValueDescription(RelationGetRelid(rel),
+											 slot,
+											 tupdesc,
+											 modifiedCols,
+											 64);
+	ereport(ERROR,
+			(errcode(ERRCODE_CHECK_VIOLATION),
+			 errmsg("new row for relation \"%s\" violates partition constraint",
+					RelationGetRelationName(orig_rel)),
+			 val_desc ? errdetail("Failing row contains %s.", val_desc) : 0));
 }
 
 /*
  * ExecConstraints - check constraints of the tuple in 'slot'
  *
- * This checks the traditional NOT NULL and check constraints, as well as
- * the partition constraint, if any.
+ * This checks the traditional NOT NULL and check constraints, and if requested,
+ * checks the partition constraint.
  *
  * Note: 'slot' contains the tuple to check the constraints of, which may
  * have been converted from the original input tuple after tuple routing.
@@ -1911,7 +1930,8 @@ ExecPartitionCheck(ResultRelInfo *resultRelInfo, TupleTableSlot *slot,
  */
 void
 ExecConstraints(ResultRelInfo *resultRelInfo,
-				TupleTableSlot *slot, EState *estate)
+				TupleTableSlot *slot, EState *estate,
+				bool check_partition_constraint)
 {
 	Relation	rel = resultRelInfo->ri_RelationDesc;
 	TupleDesc	tupdesc = RelationGetDescr(rel);
@@ -2024,8 +2044,9 @@ ExecConstraints(ResultRelInfo *resultRelInfo,
 		}
 	}
 
-	if (resultRelInfo->ri_PartitionCheck)
-		ExecPartitionCheck(resultRelInfo, slot, estate);
+	if (check_partition_constraint && resultRelInfo->ri_PartitionCheck &&
+		!ExecPartitionCheck(resultRelInfo, slot, estate))
+		ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
 }
 
 
@@ -3190,10 +3211,14 @@ EvalPlanQualEnd(EPQState *epqstate)
  * ExecSetupPartitionTupleRouting - set up information needed during
  * tuple routing for partitioned tables
  *
+ * 'update_rri' has the UPDATE per-subplan result rels.
+ * 'num_update_rri' : number of UPDATE per-subplan result rels. For INSERT,
+ *      this is 0.
+ *
  * Output arguments:
  * 'pd' receives an array of PartitionDispatch objects with one entry for
  *		every partitioned table in the partition tree
- * 'partitions' receives an array of ResultRelInfo objects with one entry for
+ * 'partitions' receives an array of ResultRelInfo* objects with one entry for
  *		every leaf partition in the partition tree
  * 'tup_conv_maps' receives an array of TupleConversionMap objects with one
  *		entry for every leaf partition (required to convert input tuple based
@@ -3213,8 +3238,10 @@ EvalPlanQualEnd(EPQState *epqstate)
  */
 void
 ExecSetupPartitionTupleRouting(Relation rel,
+							   ResultRelInfo *update_rri,
+							   int num_update_rri,
 							   PartitionDispatch **pd,
-							   ResultRelInfo **partitions,
+							   ResultRelInfo ***partitions,
 							   TupleConversionMap ***tup_conv_maps,
 							   TupleTableSlot **partition_tuple_slot,
 							   int *num_parted, int *num_partitions)
@@ -3223,18 +3250,60 @@ ExecSetupPartitionTupleRouting(Relation rel,
 	List	   *leaf_parts;
 	ListCell   *cell;
 	int			i;
-	ResultRelInfo *leaf_part_rri;
+	HTAB	   *result_rel_oids = NULL;
+	HASHCTL		ctl;
+	ResultRelOidsEntry *hash_entry;
+	ResultRelInfo *leaf_part_arr;
 
 	/* Get the tuple-routing information and lock partitions */
 	*pd = RelationGetPartitionDispatchInfo(rel, RowExclusiveLock, num_parted,
 										   &leaf_parts);
 	*num_partitions = list_length(leaf_parts);
-	*partitions = (ResultRelInfo *) palloc(*num_partitions *
-										   sizeof(ResultRelInfo));
+	*partitions = (ResultRelInfo **) palloc(*num_partitions *
+										   sizeof(ResultRelInfo*));
 	*tup_conv_maps = (TupleConversionMap **) palloc0(*num_partitions *
 													 sizeof(TupleConversionMap *));
 
 	/*
+	 * For Updates, if the leaf partition is already present in the per-subplan
+	 * result rels, we re-use that rather than initialize a new result rel. So
+	 * to find whether a given leaf partition already has a resultRel, we build
+	 * the hash table for searching each of the leaf partitions by oid.
+	 */
+	if (num_update_rri != 0)
+	{
+		ResultRelInfo	   *resultRelInfo;
+
+		memset(&ctl, 0, sizeof(ctl));
+		ctl.keysize = sizeof(Oid);
+		ctl.entrysize = sizeof(ResultRelOidsEntry);
+		ctl.hcxt = CurrentMemoryContext;
+		result_rel_oids = hash_create("result_rel_oids temporary hash",
+								32, /* start small and extend */
+								&ctl,
+								HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+		resultRelInfo = update_rri;
+		for (i = 0; i < num_update_rri; i++, resultRelInfo++)
+		{
+			Oid reloid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
+
+			hash_entry = hash_search(result_rel_oids, &reloid,
+									 HASH_ENTER, NULL);
+			hash_entry->resultRelInfo = resultRelInfo;
+		}
+	}
+	else
+	{
+		/*
+		 * For inserts, we need to create all new result rels, so avoid repeated
+		 * pallocs by allocating memory for all the result rels in bulk.
+		 */
+		leaf_part_arr = (ResultRelInfo *) palloc0(*num_partitions *
+												  sizeof(ResultRelInfo));
+	}
+
+	/*
 	 * Initialize an empty slot that will be used to manipulate tuples of any
 	 * given partition's rowtype.  It is attached to the caller-specified node
 	 * (such as ModifyTableState) and released when the node finishes
@@ -3242,23 +3311,65 @@ ExecSetupPartitionTupleRouting(Relation rel,
 	 */
 	*partition_tuple_slot = MakeTupleTableSlot();
 
-	leaf_part_rri = *partitions;
 	i = 0;
 	foreach(cell, leaf_parts)
 	{
-		Relation	partrel;
+		ResultRelInfo *leaf_part_rri;
+		Relation	partrel = NULL;
 		TupleDesc	part_tupdesc;
+		Oid			leaf_oid = lfirst_oid(cell);
+
+		if (num_update_rri != 0)
+		{
+			/*
+			 * If this leaf partition is already present in the per-subplan
+			 * resultRelInfos, re-use that resultRelInfo along with its
+			 * already-opened relation; otherwise create a new result rel.
+			 */
+			hash_entry = hash_search(result_rel_oids, &leaf_oid,
+									 HASH_FIND, NULL);
+			if (hash_entry != NULL)
+			{
+				leaf_part_rri = hash_entry->resultRelInfo;
+				partrel = leaf_part_rri->ri_RelationDesc;
+
+				/*
+				 * This is required when converting tuple as per root partition
+				 * tuple descriptor. When generating the update plans, this was
+				 * not set.
+				 */
+				leaf_part_rri->ri_PartitionRoot = rel;
+			}
+			else
+				leaf_part_rri = (ResultRelInfo *) palloc0(sizeof(ResultRelInfo));
+		}
+		else
+		{
+			/* For INSERTs, we already have an array of result rels allocated */
+			leaf_part_rri = leaf_part_arr + i;
+		}
 
 		/*
-		 * We locked all the partitions above including the leaf partitions.
-		 * Note that each of the relations in *partitions are eventually
-		 * closed by the caller.
+		 * If we didn't open the partition rel, it means we haven't initialized
+		 * the result rel as well.
 		 */
-		partrel = heap_open(lfirst_oid(cell), NoLock);
+		if (!partrel)
+		{
+			/*
+			 * We locked all the partitions above including the leaf partitions.
+			 * Note that each of the newly opened relations in *partitions are
+			 * eventually closed by the caller.
+			 */
+			partrel = heap_open(leaf_oid, NoLock);
+			InitResultRelInfo(leaf_part_rri, partrel, 1 /* dummy */, rel, 0);
+		}
+
 		part_tupdesc = RelationGetDescr(partrel);
 
 		/*
-		 * Verify result relation is a valid target for the current operation.
+		 * Verify result relation is a valid target for insert operation.
+		 * Even for updates, we are doing this for tuple-routing, so again,
+		 * we need to check the validity for insert operation.
 		 */
 		CheckValidResultRel(partrel, CMD_INSERT);
 
@@ -3269,12 +3380,6 @@ ExecSetupPartitionTupleRouting(Relation rel,
 		(*tup_conv_maps)[i] = convert_tuples_by_name(tupDesc, part_tupdesc,
 													 gettext_noop("could not convert row type"));
 
-		InitResultRelInfo(leaf_part_rri,
-						  partrel,
-						  1,	/* dummy */
-						  rel,
-						  0);
-
 		/*
 		 * Open partition indices (remember we do not support ON CONFLICT in
 		 * case of partitioned tables, so we do not need support information
@@ -3284,9 +3389,12 @@ ExecSetupPartitionTupleRouting(Relation rel,
 			leaf_part_rri->ri_IndexRelationDescs == NULL)
 			ExecOpenIndices(leaf_part_rri, false);
 
-		leaf_part_rri++;
+		(*partitions)[i] = leaf_part_rri;
 		i++;
 	}
+
+	if (result_rel_oids != NULL)
+		hash_destroy(result_rel_oids);
 }
 
 /*
@@ -3312,8 +3420,9 @@ ExecFindPartition(ResultRelInfo *resultRelInfo, PartitionDispatch *pd,
 	 * First check the root table's partition constraint, if any.  No point in
 	 * routing the tuple it if it doesn't belong in the root table itself.
 	 */
-	if (resultRelInfo->ri_PartitionCheck)
-		ExecPartitionCheck(resultRelInfo, slot, estate);
+	if (resultRelInfo->ri_PartitionCheck &&
+		!ExecPartitionCheck(resultRelInfo, slot, estate))
+		ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
 
 	result = get_partition_for_tuple(pd, slot, estate,
 									 &failed_at, &failed_slot);
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index bc53d07..eca60f2 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -402,7 +402,7 @@ ExecSimpleRelationInsert(EState *estate, TupleTableSlot *slot)
 
 		/* Check the constraints of the tuple */
 		if (rel->rd_att->constr)
-			ExecConstraints(resultRelInfo, slot, estate);
+			ExecConstraints(resultRelInfo, slot, estate, true);
 
 		/* Store the slot into tuple that we can inspect. */
 		tuple = ExecMaterializeSlot(slot);
@@ -467,7 +467,7 @@ ExecSimpleRelationUpdate(EState *estate, EPQState *epqstate,
 
 		/* Check the constraints of the tuple */
 		if (rel->rd_att->constr)
-			ExecConstraints(resultRelInfo, slot, estate);
+			ExecConstraints(resultRelInfo, slot, estate, true);
 
 		/* Store the slot into tuple that we can write. */
 		tuple = ExecMaterializeSlot(slot);
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 8d17425..51931f4 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -45,6 +45,7 @@
 #include "foreign/fdwapi.h"
 #include "miscadmin.h"
 #include "nodes/nodeFuncs.h"
+#include "optimizer/var.h"
 #include "parser/parsetree.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
@@ -53,6 +54,8 @@
 #include "utils/rel.h"
 #include "utils/tqual.h"
 
+#define GetUpdatedColumns(relinfo, estate) \
+	(rt_fetch((relinfo)->ri_RangeTableIndex, (estate)->es_range_table)->updatedCols)
 
 static bool ExecOnConflictUpdate(ModifyTableState *mtstate,
 					 ResultRelInfo *resultRelInfo,
@@ -239,6 +242,34 @@ ExecCheckTIDVisible(EState *estate,
 	ReleaseBuffer(buffer);
 }
 
+/*
+ * ConvertPartitionTupleSlot -- convenience function for converting tuple and
+ * storing it into a dedicated partition tuple slot. Passes the partition
+ * tuple slot back into output param p_slot. If no mapping present, keeps
+ * p_slot unchanged.
+ *
+ * Returns the converted tuple.
+ */
+static HeapTuple
+ConvertPartitionTupleSlot(ModifyTableState *mtstate, TupleConversionMap *map,
+						  HeapTuple tuple, TupleTableSlot **p_slot)
+{
+	if (!map)
+		return tuple;
+
+	tuple = do_convert_tuple(tuple, map);
+
+	/*
+	 * Change the partition tuple slot descriptor, as per converted tuple.
+	 */
+	*p_slot = mtstate->mt_partition_tuple_slot;
+	Assert(*p_slot != NULL);
+	ExecSetSlotDescriptor(*p_slot, map->outdesc);
+	ExecStoreTuple(tuple, *p_slot, InvalidBuffer, true);
+
+	return tuple;
+}
+
 /* ----------------------------------------------------------------
  *		ExecInsert
  *
@@ -280,7 +311,38 @@ ExecInsert(ModifyTableState *mtstate,
 	if (mtstate->mt_partition_dispatch_info)
 	{
 		int			leaf_part_index;
-		TupleConversionMap *map;
+		ResultRelInfo *rootResultRelInfo;
+
+		/*
+		 * If the original operation is UPDATE, the root partition rel needs to
+		 * be fetched from mtstate->rootResultRelInfo.
+		 */
+		rootResultRelInfo = (mtstate->rootResultRelInfo ?
+							 mtstate->rootResultRelInfo : resultRelInfo);
+
+		/*
+		 * If the resultRelInfo is not the root partition (which happens for
+		 * UPDATE), we should convert the tuple into root partition's tuple
+		 * descriptor, since ExecFindPartition() starts the search from root.
+		 * The tuple conversion map list is in the order of
+		 * mstate->resultRelInfo[], so to retrieve the one for this resultRel,
+		 * we need to know the position of the resultRel in
+		 * mtstate->resultRelInfo[]. Note: We assume that if the resultRelInfo
+		 * does not belong to subplans, then it already matches the root tuple
+		 * descriptor; although there is no such known scenario where this
+		 * could happen.
+		 */
+		if (rootResultRelInfo != resultRelInfo &&
+			mtstate->mt_resultrel_maps != NULL &&
+			resultRelInfo >= mtstate->resultRelInfo &&
+			resultRelInfo <= mtstate->resultRelInfo + mtstate->mt_nplans-1)
+		{
+			int		map_index = resultRelInfo - mtstate->resultRelInfo;
+
+			tuple = ConvertPartitionTupleSlot(mtstate,
+									  mtstate->mt_resultrel_maps[map_index],
+									  tuple, &slot);
+		}
 
 		/*
 		 * Away we go ... If we end up not finding a partition after all,
@@ -290,7 +352,7 @@ ExecInsert(ModifyTableState *mtstate,
 		 * the ResultRelInfo and TupleConversionMap for the partition,
 		 * respectively.
 		 */
-		leaf_part_index = ExecFindPartition(resultRelInfo,
+		leaf_part_index = ExecFindPartition(rootResultRelInfo,
 											mtstate->mt_partition_dispatch_info,
 											slot,
 											estate);
@@ -302,7 +364,7 @@ ExecInsert(ModifyTableState *mtstate,
 		 * the selected partition.
 		 */
 		saved_resultRelInfo = resultRelInfo;
-		resultRelInfo = mtstate->mt_partitions + leaf_part_index;
+		resultRelInfo = mtstate->mt_partitions[leaf_part_index];
 
 		/* We do not yet have a way to insert into a foreign partition */
 		if (resultRelInfo->ri_FdwRoutine)
@@ -347,23 +409,9 @@ ExecInsert(ModifyTableState *mtstate,
 		 * We might need to convert from the parent rowtype to the partition
 		 * rowtype.
 		 */
-		map = mtstate->mt_partition_tupconv_maps[leaf_part_index];
-		if (map)
-		{
-			Relation	partrel = resultRelInfo->ri_RelationDesc;
-
-			tuple = do_convert_tuple(tuple, map);
-
-			/*
-			 * We must use the partition's tuple descriptor from this point
-			 * on, until we're finished dealing with the partition. Use the
-			 * dedicated slot for that.
-			 */
-			slot = mtstate->mt_partition_tuple_slot;
-			Assert(slot != NULL);
-			ExecSetSlotDescriptor(slot, RelationGetDescr(partrel));
-			ExecStoreTuple(tuple, slot, InvalidBuffer, true);
-		}
+		tuple = ConvertPartitionTupleSlot(mtstate,
+						mtstate->mt_partition_tupconv_maps[leaf_part_index],
+						tuple, &slot);
 	}
 
 	resultRelationDesc = resultRelInfo->ri_RelationDesc;
@@ -481,7 +529,7 @@ ExecInsert(ModifyTableState *mtstate,
 
 		/* Check the constraints of the tuple */
 		if (resultRelationDesc->rd_att->constr || check_partition_constr)
-			ExecConstraints(resultRelInfo, slot, estate);
+			ExecConstraints(resultRelInfo, slot, estate, true);
 
 		if (onconflict != ONCONFLICT_NONE && resultRelInfo->ri_NumIndices > 0)
 		{
@@ -673,6 +721,8 @@ ExecDelete(ModifyTableState *mtstate,
 		   TupleTableSlot *planSlot,
 		   EPQState *epqstate,
 		   EState *estate,
+		   bool   *concurrently_deleted,
+		   bool process_returning,
 		   bool canSetTag)
 {
 	ResultRelInfo *resultRelInfo;
@@ -681,6 +731,9 @@ ExecDelete(ModifyTableState *mtstate,
 	HeapUpdateFailureData hufd;
 	TupleTableSlot *slot = NULL;
 
+	if (concurrently_deleted)
+		*concurrently_deleted = false;
+
 	/*
 	 * get information on the (current) result relation
 	 */
@@ -824,6 +877,8 @@ ldelete:;
 					}
 				}
 				/* tuple already deleted; nothing to do */
+				if (concurrently_deleted)
+					*concurrently_deleted = true;
 				return NULL;
 
 			default:
@@ -848,8 +903,8 @@ ldelete:;
 	ExecARDeleteTriggers(estate, resultRelInfo, tupleid, oldtuple,
 						 mtstate->mt_transition_capture);
 
-	/* Process RETURNING if present */
-	if (resultRelInfo->ri_projectReturning)
+	/* Process RETURNING if present and if requested */
+	if (process_returning && resultRelInfo->ri_projectReturning)
 	{
 		/*
 		 * We have to put the target tuple into a slot, which means first we
@@ -942,6 +997,8 @@ ExecUpdate(ModifyTableState *mtstate,
 	HTSU_Result result;
 	HeapUpdateFailureData hufd;
 	List	   *recheckIndexes = NIL;
+	bool		partition_check_passed = true;
+	bool		has_br_trigger;
 
 	/*
 	 * abort the operation if not running transactions
@@ -962,16 +1019,56 @@ ExecUpdate(ModifyTableState *mtstate,
 	resultRelationDesc = resultRelInfo->ri_RelationDesc;
 
 	/* BEFORE ROW UPDATE Triggers */
-	if (resultRelInfo->ri_TrigDesc &&
-		resultRelInfo->ri_TrigDesc->trig_update_before_row)
+	has_br_trigger = (resultRelInfo->ri_TrigDesc &&
+					  resultRelInfo->ri_TrigDesc->trig_update_before_row);
+
+	if (has_br_trigger)
 	{
-		slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
-									tupleid, oldtuple, slot);
+		TupleTableSlot *trig_slot;
 
-		if (slot == NULL)		/* "do nothing" */
+		trig_slot = ExecBRUpdateTriggers(estate, epqstate, resultRelInfo,
+										 tupleid, oldtuple, slot);
+
+		if (trig_slot == NULL)		/* "do nothing" */
 			return NULL;
 
+		if (resultRelInfo->ri_PartitionCheck)
+		{
+			bool		partition_check_passed_with_trig_tuple;
+
+			partition_check_passed =
+				(resultRelInfo->ri_PartitionCheck &&
+				 ExecPartitionCheck(resultRelInfo, slot, estate));
+
+			partition_check_passed_with_trig_tuple =
+				(resultRelInfo->ri_PartitionCheck &&
+				 ExecPartitionCheck(resultRelInfo, trig_slot, estate));
+
+			if (partition_check_passed)
+			{
+				/*
+				 * If it's the trigger that is causing partition constraint
+				 * violation, abort. We don't want a trigger to cause tuple
+				 * routing.
+				 */
+				if (!partition_check_passed_with_trig_tuple)
+					ExecPartitionCheckEmitError(resultRelInfo,
+												trig_slot, estate);
+			}
+			else
+			{
+				/*
+				 * Partition constraint failed with original NEW tuple. But the
+				 * trigger might even have modifed the tuple such that it fits
+				 * back into the partition. So partition constraint check
+				 * should be based on *final* NEW tuple.
+				 */
+				partition_check_passed = partition_check_passed_with_trig_tuple;
+			}
+		}
+
 		/* trigger might have changed tuple */
+		slot = trig_slot;
 		tuple = ExecMaterializeSlot(slot);
 	}
 
@@ -1038,12 +1135,60 @@ lreplace:;
 								 resultRelInfo, slot, estate);
 
 		/*
+		 * If a partition check fails, try to move the row into the right
+		 * partition. With a BR trigger, the tuple has already gone through EPQ
+		 * and has been locked; so it won't change again. So, avoid an extra
+		 * partition check if we already did it above in the presence of BR
+		 * triggers.
+		 */
+		if (!has_br_trigger)
+		{
+			partition_check_passed =
+				(!resultRelInfo->ri_PartitionCheck ||
+				ExecPartitionCheck(resultRelInfo, slot, estate));
+		}
+
+		if (!partition_check_passed)
+		{
+			bool	concurrently_deleted;
+
+			/*
+			 * When an UPDATE is run with a leaf partition, we would not have
+			 * partition tuple routing setup. In that case, fail with partition
+			 * constraint violation error.
+			 */
+			if (mtstate->mt_partition_dispatch_info == NULL)
+				ExecPartitionCheckEmitError(resultRelInfo, slot, estate);
+
+			/* Do the row movement. */
+
+			/*
+			 * Skip RETURNING processing for DELETE. We want to return rows
+			 * from INSERT.
+			 */
+			ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate, estate,
+					   &concurrently_deleted, false, false);
+
+			/*
+			 * The row was already deleted by a concurrent DELETE. So we don't
+			 * have anything to update.
+			 */
+			if (concurrently_deleted)
+				return NULL;
+
+			return ExecInsert(mtstate, slot, planSlot, NULL,
+								  ONCONFLICT_NONE, estate, canSetTag);
+		}
+
+		/*
 		 * Check the constraints of the tuple.  Note that we pass the same
 		 * slot for the orig_slot argument, because unlike ExecInsert(), no
 		 * tuple-routing is performed here, hence the slot remains unchanged.
+		 * We have already checked partition constraints above, so skip them
+		 * below.
 		 */
-		if (resultRelationDesc->rd_att->constr || resultRelInfo->ri_PartitionCheck)
-			ExecConstraints(resultRelInfo, slot, estate);
+		if (resultRelationDesc->rd_att->constr)
+			ExecConstraints(resultRelInfo, slot, estate, false);
 
 		/*
 		 * replace the heap tuple
@@ -1462,6 +1607,36 @@ fireASTriggers(ModifyTableState *node)
 }
 
 /*
+ * Check whether partition key is modified for any of the relations.
+ */
+static bool
+IsPartitionKeyUpdate(EState *estate, ResultRelInfo *result_rels, int num_rels)
+{
+	int		i;
+
+	/*
+	 * Each of the result relations has the updated columns set stored
+	 * according to its own column ordering. So we need to pull the attno of
+	 * the partition quals of each of the relations, and check if the updated
+	 * column attributes are present in the vars in the partition quals.
+	 */
+	for (i = 0; i < num_rels; i++)
+	{
+		ResultRelInfo *resultRelInfo = &result_rels[i];
+		Relation		rel = resultRelInfo->ri_RelationDesc;
+		Bitmapset	  *expr_attrs = NULL;
+
+		pull_varattnos((Node *) rel->rd_partcheck, 1, &expr_attrs);
+
+		/* Both bitmaps are offset by FirstLowInvalidHeapAttributeNumber. */
+		if (bms_overlap(expr_attrs, GetUpdatedColumns(resultRelInfo, estate)))
+			return true;
+	}
+
+	return false;
+}
+
+/*
  * Set up the state needed for collecting transition tuples for AFTER
  * triggers.
  */
@@ -1482,23 +1657,22 @@ ExecSetupTransitionCaptureState(ModifyTableState *mtstate, EState *estate)
 	 */
 	if (mtstate->mt_transition_capture != NULL)
 	{
-		ResultRelInfo *resultRelInfos;
+		ResultRelInfo *resultRelInfo;
 		int		numResultRelInfos;
+		bool	tuple_routing = (mtstate->mt_partition_dispatch_info != NULL);
 
 		/* Find the set of partitions so that we can find their TupleDescs. */
-		if (mtstate->mt_partition_dispatch_info != NULL)
+		if (tuple_routing)
 		{
 			/*
 			 * For INSERT via partitioned table, so we need TupleDescs based
 			 * on the partition routing table.
 			 */
-			resultRelInfos = mtstate->mt_partitions;
 			numResultRelInfos = mtstate->mt_num_partitions;
 		}
 		else
 		{
 			/* Otherwise we need the ResultRelInfo for each subplan. */
-			resultRelInfos = mtstate->resultRelInfo;
 			numResultRelInfos = mtstate->mt_nplans;
 		}
 
@@ -1512,8 +1686,15 @@ ExecSetupTransitionCaptureState(ModifyTableState *mtstate, EState *estate)
 			palloc0(sizeof(TupleConversionMap *) * numResultRelInfos);
 		for (i = 0; i < numResultRelInfos; ++i)
 		{
+			/*
+			 * As stated above, mapping source is different for INSERT or
+			 * otherwise.
+			 */
+			resultRelInfo = (tuple_routing ?
+					mtstate->mt_partitions[i] : &mtstate->resultRelInfo[i]);
+
 			mtstate->mt_transition_tupconv_maps[i] =
-				convert_tuples_by_name(RelationGetDescr(resultRelInfos[i].ri_RelationDesc),
+				convert_tuples_by_name(RelationGetDescr(resultRelInfo->ri_RelationDesc),
 									   RelationGetDescr(targetRelInfo->ri_RelationDesc),
 									   gettext_noop("could not convert row type"));
 		}
@@ -1746,7 +1927,8 @@ ExecModifyTable(ModifyTableState *node)
 				break;
 			case CMD_DELETE:
 				slot = ExecDelete(node, tupleid, oldtuple, planSlot,
-								  &node->mt_epqstate, estate, node->canSetTag);
+								  &node->mt_epqstate, estate,
+								  NULL, true, node->canSetTag);
 				break;
 			default:
 				elog(ERROR, "unknown operation");
@@ -1786,11 +1968,14 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 {
 	ModifyTableState *mtstate;
 	CmdType		operation = node->operation;
+	bool		is_partitionkey_update = false;
 	int			nplans = list_length(node->plans);
 	ResultRelInfo *saved_resultRelInfo;
 	ResultRelInfo *resultRelInfo;
 	TupleDesc	tupDesc;
 	Plan	   *subplan;
+	int			firstVarno = 0;
+	Relation	firstResultRel = NULL;
 	ListCell   *l;
 	int			i;
 	Relation	rel;
@@ -1902,18 +2087,30 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	else
 		rel = mtstate->resultRelInfo->ri_RelationDesc;
 
-	/* Build state for INSERT tuple routing */
-	if (operation == CMD_INSERT &&
-		rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	/* Remember whether it is going to be an update of partition key. */
+	is_partitionkey_update =
+				(rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+				operation == CMD_UPDATE &&
+				IsPartitionKeyUpdate(estate, mtstate->resultRelInfo, nplans));
+
+	/*
+	 * Build state for tuple routing if it's an INSERT or if it's an UPDATE of
+	 * partition key.
+	 */
+	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE &&
+		(operation == CMD_INSERT || is_partitionkey_update))
 	{
 		PartitionDispatch *partition_dispatch_info;
-		ResultRelInfo *partitions;
+		ResultRelInfo **partitions;
 		TupleConversionMap **partition_tupconv_maps;
 		TupleTableSlot *partition_tuple_slot;
 		int			num_parted,
 					num_partitions;
 
 		ExecSetupPartitionTupleRouting(rel,
+									   (operation == CMD_UPDATE ?
+											mtstate->resultRelInfo : NULL),
+									   (operation == CMD_UPDATE ? nplans : 0),
 									   &partition_dispatch_info,
 									   &partitions,
 									   &partition_tupconv_maps,
@@ -1925,6 +2122,43 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 		mtstate->mt_num_partitions = num_partitions;
 		mtstate->mt_partition_tupconv_maps = partition_tupconv_maps;
 		mtstate->mt_partition_tuple_slot = partition_tuple_slot;
+
+		/*
+		 * Below are required as reference objects for mapping partition
+		 * attno's in expressions such as WCO and RETURNING.
+		 */
+		firstVarno = mtstate->resultRelInfo[0].ri_RangeTableIndex;
+		firstResultRel = mtstate->resultRelInfo[0].ri_RelationDesc;
+	}
+
+	/*
+	 * Construct mapping from each of the resultRelInfo attnos to the root
+	 * attno. This is required when during update row movement the tuple
+	 * descriptor of a source partition does not match the root partition
+	 * descriptor. In such case we need to convert tuples to the root partition
+	 * tuple descriptor, because the search for destination partition starts
+	 * from the root. Skip this setup if it's not a partition key update or if
+	 * there are no partitions below this partitioned table.
+	 */
+	if (is_partitionkey_update && mtstate->mt_num_partitions > 0)
+	{
+		TupleConversionMap **tup_conv_maps;
+		TupleDesc		outdesc;
+
+		mtstate->mt_resultrel_maps =
+		(TupleConversionMap **) palloc0(sizeof(TupleConversionMap*) * nplans);
+
+		/* Get tuple descriptor of the root partition. */
+		outdesc = RelationGetDescr(mtstate->mt_partition_dispatch_info[0]->reldesc);
+
+		resultRelInfo = mtstate->resultRelInfo;
+		tup_conv_maps = mtstate->mt_resultrel_maps;
+		for (i = 0; i < nplans; i++)
+		{
+			TupleDesc indesc = RelationGetDescr(resultRelInfo[i].ri_RelationDesc);
+			tup_conv_maps[i] = convert_tuples_by_name(indesc, outdesc,
+								 gettext_noop("could not convert row type"));
+		}
 	}
 
 	/* Build state for collecting transition tuples */
@@ -1960,50 +2194,52 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	 * Build WITH CHECK OPTION constraints for each leaf partition rel. Note
 	 * that we didn't build the withCheckOptionList for each partition within
 	 * the planner, but simple translation of the varattnos for each partition
-	 * will suffice.  This only occurs for the INSERT case; UPDATE/DELETE
-	 * cases are handled above.
+	 * will suffice.  This only occurs for the INSERT case or for UPDATE
+	 * row movement. DELETEs and local UPDATEs are handled above.
 	 */
 	if (node->withCheckOptionLists != NIL && mtstate->mt_num_partitions > 0)
 	{
-		List	   *wcoList;
-		PlanState  *plan;
+		List	   *firstWco;
 
 		/*
 		 * In case of INSERT on partitioned tables, there is only one plan.
 		 * Likewise, there is only one WITH CHECK OPTIONS list, not one per
-		 * partition.  We make a copy of the WCO qual for each partition; note
-		 * that, if there are SubPlans in there, they all end up attached to
-		 * the one parent Plan node.
+		 * partition. Whereas for UPDATE, there are as many WCOs as there are
+		 * plans. So in either case, use the WCO expression of the first
+		 * resultRelInfo as a reference to calculate attno's for the WCO
+		 * expression of each of the partitions. We make a copy of the WCO qual
+		 * for each partition. Note that, if there are SubPlans in there, they
+		 * all end up attached to the one parent Plan node.
 		 */
-		Assert(operation == CMD_INSERT &&
+		Assert(is_partitionkey_update ||
+			   (operation == CMD_INSERT &&
 			   list_length(node->withCheckOptionLists) == 1 &&
-			   mtstate->mt_nplans == 1);
-		wcoList = linitial(node->withCheckOptionLists);
-		plan = mtstate->mt_plans[0];
-		resultRelInfo = mtstate->mt_partitions;
+			   mtstate->mt_nplans == 1));
+
+		firstWco = linitial(node->withCheckOptionLists);
 		for (i = 0; i < mtstate->mt_num_partitions; i++)
 		{
-			Relation	partrel = resultRelInfo->ri_RelationDesc;
-			List	   *mapped_wcoList;
+			Relation	partrel;
+			List	   *mappedWco;
 			List	   *wcoExprs = NIL;
 			ListCell   *ll;
 
-			/* varno = node->nominalRelation */
-			mapped_wcoList = map_partition_varattnos(wcoList,
-													 node->nominalRelation,
-													 partrel, rel);
-			foreach(ll, mapped_wcoList)
+			resultRelInfo = mtstate->mt_partitions[i];
+
+			partrel = resultRelInfo->ri_RelationDesc;
+			mappedWco = map_partition_varattnos(firstWco, firstVarno,
+												partrel, firstResultRel);
+			foreach(ll, mappedWco)
 			{
 				WithCheckOption *wco = castNode(WithCheckOption, lfirst(ll));
 				ExprState  *wcoExpr = ExecInitQual(castNode(List, wco->qual),
-												   plan);
+												   &mtstate->ps);
 
 				wcoExprs = lappend(wcoExprs, wcoExpr);
 			}
 
-			resultRelInfo->ri_WithCheckOptions = mapped_wcoList;
+			resultRelInfo->ri_WithCheckOptions = mappedWco;
 			resultRelInfo->ri_WithCheckOptionExprs = wcoExprs;
-			resultRelInfo++;
 		}
 	}
 
@@ -2014,7 +2250,7 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 	{
 		TupleTableSlot *slot;
 		ExprContext *econtext;
-		List	   *returningList;
+		List	   *firstReturningList;
 
 		/*
 		 * Initialize result tuple slot and assign its rowtype using the first
@@ -2051,20 +2287,25 @@ ExecInitModifyTable(ModifyTable *node, EState *estate, int eflags)
 		 * Build a projection for each leaf partition rel.  Note that we
 		 * didn't build the returningList for each partition within the
 		 * planner, but simple translation of the varattnos for each partition
-		 * will suffice.  This only occurs for the INSERT case; UPDATE/DELETE
-		 * are handled above.
+		 * will suffice.  This only occurs for the INSERT case or for UPDATE
+		 * row movement. DELETEs and local UPDATEs are handled above.
 		 */
-		resultRelInfo = mtstate->mt_partitions;
-		returningList = linitial(node->returningLists);
+		firstReturningList = linitial(node->returningLists);
 		for (i = 0; i < mtstate->mt_num_partitions; i++)
 		{
-			Relation	partrel = resultRelInfo->ri_RelationDesc;
+			Relation	partrel;
 			List	   *rlist;
 
-			/* varno = node->nominalRelation */
-			rlist = map_partition_varattnos(returningList,
-											node->nominalRelation,
-											partrel, rel);
+			resultRelInfo = mtstate->mt_partitions[i];
+			partrel = resultRelInfo->ri_RelationDesc;
+
+			/*
+			 * Use the returning expression of the first resultRelInfo as a
+			 * reference to calculate attno's for the returning expression of
+			 * each of the partitions.
+			 */
+			rlist = map_partition_varattnos(firstReturningList, firstVarno,
+											partrel, firstResultRel);
 			resultRelInfo->ri_projectReturning =
 				ExecBuildProjectionInfo(rlist, econtext, slot, &mtstate->ps,
 										resultRelInfo->ri_RelationDesc->rd_att);
@@ -2307,6 +2548,7 @@ void
 ExecEndModifyTable(ModifyTableState *node)
 {
 	int			i;
+	CmdType		operation = node->operation;
 
 	/* Free transition tables */
 	if (node->mt_transition_capture != NULL)
@@ -2343,7 +2585,17 @@ ExecEndModifyTable(ModifyTableState *node)
 	}
 	for (i = 0; i < node->mt_num_partitions; i++)
 	{
-		ResultRelInfo *resultRelInfo = node->mt_partitions + i;
+		ResultRelInfo *resultRelInfo = node->mt_partitions[i];
+
+		/*
+		 * If this result rel is one of the subplan result rels, let
+		 * ExecEndPlan() close it. For INSERTs, this does not apply because
+		 * all leaf partition result rels are anyway newly allocated.
+		 */
+		if (operation == CMD_UPDATE &&
+			resultRelInfo >= node->resultRelInfo &&
+			resultRelInfo < node->resultRelInfo + node->mt_nplans)
+			continue;
 
 		ExecCloseIndices(resultRelInfo);
 		heap_close(resultRelInfo->ri_RelationDesc, NoLock);
diff --git a/src/include/catalog/partition.h b/src/include/catalog/partition.h
index f10879a..b1a60c2 100644
--- a/src/include/catalog/partition.h
+++ b/src/include/catalog/partition.h
@@ -79,8 +79,8 @@ extern void check_new_partition_bound(char *relname, Relation parent,
 extern Oid	get_partition_parent(Oid relid);
 extern List *get_qual_from_partbound(Relation rel, Relation parent,
 						PartitionBoundSpec *spec);
-extern List *map_partition_varattnos(List *expr, int target_varno,
-						Relation partrel, Relation parent);
+extern List *map_partition_varattnos(List *expr, int fromrel_varno,
+						Relation to_rel, Relation from_rel);
 extern List *RelationGetPartitionQual(Relation rel);
 extern Expr *get_partition_qual_relid(Oid relid);
 
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index e25cfa3..ea4205d 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -187,7 +187,10 @@ extern ResultRelInfo *ExecGetTriggerResultRel(EState *estate, Oid relid);
 extern void ExecCleanUpTriggerState(EState *estate);
 extern bool ExecContextForcesOids(PlanState *planstate, bool *hasoids);
 extern void ExecConstraints(ResultRelInfo *resultRelInfo,
-				TupleTableSlot *slot, EState *estate);
+				TupleTableSlot *slot, EState *estate,
+				bool check_partition_constraint);
+extern void ExecPartitionCheckEmitError(ResultRelInfo *resultRelInfo,
+									TupleTableSlot *slot, EState *estate);
 extern void ExecWithCheckOptions(WCOKind kind, ResultRelInfo *resultRelInfo,
 					 TupleTableSlot *slot, EState *estate);
 extern LockTupleMode ExecUpdateLockMode(EState *estate, ResultRelInfo *relinfo);
@@ -207,8 +210,10 @@ extern void EvalPlanQualSetTuple(EPQState *epqstate, Index rti,
 					 HeapTuple tuple);
 extern HeapTuple EvalPlanQualGetTuple(EPQState *epqstate, Index rti);
 extern void ExecSetupPartitionTupleRouting(Relation rel,
+							   ResultRelInfo *update_rri,
+							   int num_update_rri,
 							   PartitionDispatch **pd,
-							   ResultRelInfo **partitions,
+							   ResultRelInfo ***partitions,
 							   TupleConversionMap ***tup_conv_maps,
 							   TupleTableSlot **partition_tuple_slot,
 							   int *num_parted, int *num_partitions);
@@ -216,6 +221,8 @@ extern int ExecFindPartition(ResultRelInfo *resultRelInfo,
 				  PartitionDispatch *pd,
 				  TupleTableSlot *slot,
 				  EState *estate);
+extern bool ExecPartitionCheck(ResultRelInfo *resultRelInfo,
+							TupleTableSlot *slot, EState *estate);
 
 #define EvalPlanQualSetSlot(epqstate, slot)  ((epqstate)->origslot = (slot))
 extern void EvalPlanQualFetchRowMarks(EPQState *epqstate);
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 85fac8a..276b65b 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -959,9 +959,13 @@ typedef struct ModifyTableState
 	int			mt_num_dispatch;	/* Number of entries in the above array */
 	int			mt_num_partitions;	/* Number of members in the following
 									 * arrays */
-	ResultRelInfo *mt_partitions;	/* Per partition result relation */
-	TupleConversionMap **mt_partition_tupconv_maps;
+	ResultRelInfo **mt_partitions;	/* Per partition result relation pointers */
+
 	/* Per partition tuple conversion map */
+	TupleConversionMap **mt_partition_tupconv_maps;
+	/* Per resultRelInfo conversion map to convert tuples to root partition */
+	TupleConversionMap **mt_resultrel_maps;
+
 	TupleTableSlot *mt_partition_tuple_slot;
 	struct TransitionCaptureState *mt_transition_capture;
 									/* controls transition table population */
diff --git a/src/test/regress/expected/update.out b/src/test/regress/expected/update.out
index 9366f04..f3c03a7 100644
--- a/src/test/regress/expected/update.out
+++ b/src/test/regress/expected/update.out
@@ -198,25 +198,189 @@ INSERT INTO upsert_test VALUES (1, 'Bat') ON CONFLICT(a)
 
 DROP TABLE update_test;
 DROP TABLE upsert_test;
--- update to a partition should check partition bound constraint for the new tuple
-create table range_parted (
+-- update to a partition should check partition bound constraint for the new tuple.
+-- If partition key is updated, the row should be moved to the appropriate
+-- partition. updatable views using partitions should enforce the check options
+-- for the rows that have been moved.
+create table mintab(c1 int);
+insert into mintab values (120);
+CREATE TABLE range_parted (
 	a text,
-	b int
+	b int,
+	c int
 ) partition by range (a, b);
+CREATE VIEW upview AS SELECT * FROM range_parted WHERE (select c > c1 from mintab) WITH CHECK OPTION;
 create table part_a_1_a_10 partition of range_parted for values from ('a', 1) to ('a', 10);
 create table part_a_10_a_20 partition of range_parted for values from ('a', 10) to ('a', 20);
 create table part_b_1_b_10 partition of range_parted for values from ('b', 1) to ('b', 10);
-create table part_b_10_b_20 partition of range_parted for values from ('b', 10) to ('b', 20);
+create table part_b_10_b_20 partition of range_parted for values from ('b', 10) to ('b', 20) partition by range (c);
+-- This tests partition-key UPDATE on a partitioned table that does not have any child partitions
+update part_b_10_b_20 set b = b - 6;
+create table part_c_1_100 (b int, c int, a text);
+alter table part_b_10_b_20 attach partition part_c_1_100 for values from (1) to (100);
+create table part_c_100_200 (c int, a text, b int);
+alter table part_b_10_b_20 attach partition part_c_100_200 for values from (100) to (200);
 insert into part_a_1_a_10 values ('a', 1);
-insert into part_b_10_b_20 values ('b', 10);
--- fail
-update part_a_1_a_10 set a = 'b' where a = 'a';
-ERROR:  new row for relation "part_a_1_a_10" violates partition constraint
-DETAIL:  Failing row contains (b, 1).
-update range_parted set b = b - 1 where b = 10;
+insert into part_a_10_a_20 values ('a', 10, 200);
+insert into part_c_1_100 (a, b, c) values ('b', 12, 96);
+insert into part_c_1_100 (a, b, c) values ('b', 13, 97);
+insert into part_c_100_200 (a, b, c) values ('b', 15, 105);
+insert into part_c_100_200 (a, b, c) values ('b', 17, 105);
+-- fail (row movement happens only within the partition subtree) :
+update part_c_1_100 set c = c + 20 where c = 96;
+ERROR:  new row for relation "part_c_1_100" violates partition constraint
+DETAIL:  Failing row contains (12, 116, b).
+-- No row found :
+update part_c_1_100 set c = c + 20 where c = 98;
+-- ok (row movement)
+update part_b_10_b_20 set c = c + 20 returning c, b, a;
+  c  | b  | a 
+-----+----+---
+ 116 | 12 | b
+ 117 | 13 | b
+ 125 | 15 | b
+ 125 | 17 | b
+(4 rows)
+
+select a, b, c from part_c_1_100 order by 1, 2, 3;
+ a | b | c 
+---+---+---
+(0 rows)
+
+select a, b, c from part_c_100_200 order by 1, 2, 3;
+ a | b  |  c  
+---+----+-----
+ b | 12 | 116
+ b | 13 | 117
+ b | 15 | 125
+ b | 17 | 125
+(4 rows)
+
+-- fail (row movement happens only within the partition subtree) :
+update part_b_10_b_20 set b = b - 6 where c > 116 returning *;
 ERROR:  new row for relation "part_b_10_b_20" violates partition constraint
-DETAIL:  Failing row contains (b, 9).
--- ok
-update range_parted set b = b + 1 where b = 10;
+DETAIL:  Failing row contains (b, 7, 117).
+-- ok (row movement, with subset of rows moved into different partition)
+update range_parted set b = b - 6 where c > 116 returning a, b + c;
+ a | ?column? 
+---+----------
+ a |      204
+ b |      124
+ b |      134
+ b |      136
+(4 rows)
+
+select tableoid::regclass partname, * from range_parted order by 1, 2, 3, 4;
+    partname    | a | b  |  c  
+----------------+---+----+-----
+ part_a_1_a_10  | a |  1 |    
+ part_a_1_a_10  | a |  4 | 200
+ part_b_1_b_10  | b |  7 | 117
+ part_b_1_b_10  | b |  9 | 125
+ part_c_100_200 | b | 11 | 125
+ part_c_100_200 | b | 12 | 116
+(6 rows)
+
+-- update partition key using updatable view.
+-- succeeds
+update upview set c = 199 where b = 4;
+-- fail, check option violation
+update upview set c = 120 where b = 4;
+ERROR:  new row violates check option for view "upview"
+DETAIL:  Failing row contains (a, 4, 120).
+-- fail, row movement with check option violation
+update upview set a = 'b', b = 15, c = 120 where b = 4;
+ERROR:  new row violates check option for view "upview"
+DETAIL:  Failing row contains (120, b, 15).
+-- succeeds, row movement , check option passes
+update upview set a = 'b', b = 15 where b = 4;
+select tableoid::regclass partname, * from range_parted order by 1, 2, 3, 4;
+    partname    | a | b  |  c  
+----------------+---+----+-----
+ part_a_1_a_10  | a |  1 |    
+ part_b_1_b_10  | b |  7 | 117
+ part_b_1_b_10  | b |  9 | 125
+ part_c_100_200 | b | 11 | 125
+ part_c_100_200 | b | 12 | 116
+ part_c_100_200 | b | 15 | 199
+(6 rows)
+
 -- cleanup
-drop table range_parted;
+drop view upview;
+drop table mintab, range_parted;
+--------------
+-- UPDATE with
+-- partition key or non-partition columns, with different column ordering,
+-- triggers.
+--------------
+-- Setup
+--------
+create table list_parted (a int, b int, c int) partition by list (a);
+create table sub_parted partition of list_parted for values in (1) partition by list (b);
+create table sub_part1(b int, c int, a int);
+alter table sub_parted attach partition sub_part1 for values in (1);
+create table sub_part2(b int, c int, a int);
+alter table sub_parted attach partition sub_part2 for values in (2);
+create table list_part1(a int, b int, c int);
+alter table list_parted attach partition list_part1 for values in (2,3);
+insert into list_parted values (2,5,50);
+insert into list_parted values (3,6,60);
+insert into sub_parted values (1,1,60);
+insert into sub_parted values (1,2,10);
+-- Test partition constraint violation when intermediate ancestor is used and
+-- constraint is inherited from upper root.
+update sub_parted set a = 2 where c = 10;
+ERROR:  new row for relation "sub_parted" violates partition constraint
+DETAIL:  Failing row contains (2, 2, 10).
+-- UPDATE which does not modify partition key of partitions that are chosen for update.
+select tableoid::regclass , * from list_parted where a = 2 order by 1;
+  tableoid  | a | b | c  
+------------+---+---+----
+ list_part1 | 2 | 5 | 50
+(1 row)
+
+update list_parted set b = c + a where a = 2;
+select tableoid::regclass , * from list_parted where a = 2 order by 1;
+  tableoid  | a | b  | c  
+------------+---+----+----
+ list_part1 | 2 | 52 | 50
+(1 row)
+
+-----------
+-- Triggers should not be allowed to initiate the update row movement
+-----------
+create function func_parted_mod_b() returns trigger as $$
+begin
+   NEW.b = 2; -- THis is changing partition key column.
+   return NEW;
+end $$ language plpgsql;
+create trigger parted_mod_b before update on sub_part1
+   for each row execute procedure func_parted_mod_b();
+select tableoid::regclass , * from list_parted order by 1, 2, 3, 4;
+  tableoid  | a | b  | c  
+------------+---+----+----
+ sub_part1  | 1 |  1 | 60
+ sub_part2  | 1 |  2 | 10
+ list_part1 | 2 | 52 | 50
+ list_part1 | 3 |  6 | 60
+(4 rows)
+
+-- This should fail because trigger on sub_part1 would change column 'b' which
+-- would violate "b in (1)" constraint.
+update list_parted set c = 70 where b  = 1 ;
+ERROR:  new row for relation "sub_part1" violates partition constraint
+DETAIL:  Failing row contains (2, 70, 1).
+drop trigger parted_mod_b ON sub_part1 ;
+-- Now that the trigger is dropped, the same update should succeed
+update list_parted set c = 70 where b  = 1 ;
+select tableoid::regclass , * from list_parted order by 1, 2, 3, 4;
+  tableoid  | a | b  | c  
+------------+---+----+----
+ sub_part1  | 1 |  1 | 70
+ sub_part2  | 1 |  2 | 10
+ list_part1 | 2 | 52 | 50
+ list_part1 | 3 |  6 | 60
+(4 rows)
+
+drop function func_parted_mod_b ( ) ;
+drop table list_parted;
diff --git a/src/test/regress/sql/update.sql b/src/test/regress/sql/update.sql
index 6637119..0113c7d 100644
--- a/src/test/regress/sql/update.sql
+++ b/src/test/regress/sql/update.sql
@@ -107,23 +107,128 @@ INSERT INTO upsert_test VALUES (1, 'Bat') ON CONFLICT(a)
 DROP TABLE update_test;
 DROP TABLE upsert_test;
 
--- update to a partition should check partition bound constraint for the new tuple
-create table range_parted (
+-- update to a partition should check partition bound constraint for the new tuple.
+-- If partition key is updated, the row should be moved to the appropriate
+-- partition. updatable views using partitions should enforce the check options
+-- for the rows that have been moved.
+create table mintab(c1 int);
+insert into mintab values (120);
+CREATE TABLE range_parted (
 	a text,
-	b int
+	b int,
+	c int
 ) partition by range (a, b);
+CREATE VIEW upview AS SELECT * FROM range_parted WHERE (select c > c1 from mintab) WITH CHECK OPTION;
+
 create table part_a_1_a_10 partition of range_parted for values from ('a', 1) to ('a', 10);
 create table part_a_10_a_20 partition of range_parted for values from ('a', 10) to ('a', 20);
 create table part_b_1_b_10 partition of range_parted for values from ('b', 1) to ('b', 10);
-create table part_b_10_b_20 partition of range_parted for values from ('b', 10) to ('b', 20);
-insert into part_a_1_a_10 values ('a', 1);
-insert into part_b_10_b_20 values ('b', 10);
+create table part_b_10_b_20 partition of range_parted for values from ('b', 10) to ('b', 20) partition by range (c);
+
+-- This tests partition-key UPDATE on a partitioned table that does not have any child partitions
+update part_b_10_b_20 set b = b - 6;
 
--- fail
-update part_a_1_a_10 set a = 'b' where a = 'a';
-update range_parted set b = b - 1 where b = 10;
--- ok
-update range_parted set b = b + 1 where b = 10;
+create table part_c_1_100 (b int, c int, a text);
+alter table part_b_10_b_20 attach partition part_c_1_100 for values from (1) to (100);
+create table part_c_100_200 (c int, a text, b int);
+alter table part_b_10_b_20 attach partition part_c_100_200 for values from (100) to (200);
+
+insert into part_a_1_a_10 values ('a', 1);
+insert into part_a_10_a_20 values ('a', 10, 200);
+insert into part_c_1_100 (a, b, c) values ('b', 12, 96);
+insert into part_c_1_100 (a, b, c) values ('b', 13, 97);
+insert into part_c_100_200 (a, b, c) values ('b', 15, 105);
+insert into part_c_100_200 (a, b, c) values ('b', 17, 105);
+
+-- fail (row movement happens only within the partition subtree) :
+update part_c_1_100 set c = c + 20 where c = 96;
+-- No row found :
+update part_c_1_100 set c = c + 20 where c = 98;
+-- ok (row movement)
+update part_b_10_b_20 set c = c + 20 returning c, b, a;
+select a, b, c from part_c_1_100 order by 1, 2, 3;
+select a, b, c from part_c_100_200 order by 1, 2, 3;
+
+-- fail (row movement happens only within the partition subtree) :
+update part_b_10_b_20 set b = b - 6 where c > 116 returning *;
+-- ok (row movement, with subset of rows moved into different partition)
+update range_parted set b = b - 6 where c > 116 returning a, b + c;
+
+select tableoid::regclass partname, * from range_parted order by 1, 2, 3, 4;
+
+-- update partition key using updatable view.
+
+-- succeeds
+update upview set c = 199 where b = 4;
+-- fail, check option violation
+update upview set c = 120 where b = 4;
+-- fail, row movement with check option violation
+update upview set a = 'b', b = 15, c = 120 where b = 4;
+-- succeeds, row movement , check option passes
+update upview set a = 'b', b = 15 where b = 4;
+
+select tableoid::regclass partname, * from range_parted order by 1, 2, 3, 4;
 
 -- cleanup
-drop table range_parted;
+drop view upview;
+drop table mintab, range_parted;
+
+
+
+--------------
+-- UPDATE with
+-- partition key or non-partition columns, with different column ordering,
+-- triggers.
+--------------
+
+-- Setup
+--------
+create table list_parted (a int, b int, c int) partition by list (a);
+create table sub_parted partition of list_parted for values in (1) partition by list (b);
+
+create table sub_part1(b int, c int, a int);
+alter table sub_parted attach partition sub_part1 for values in (1);
+create table sub_part2(b int, c int, a int);
+alter table sub_parted attach partition sub_part2 for values in (2);
+
+create table list_part1(a int, b int, c int);
+alter table list_parted attach partition list_part1 for values in (2,3);
+
+insert into list_parted values (2,5,50);
+insert into list_parted values (3,6,60);
+insert into sub_parted values (1,1,60);
+insert into sub_parted values (1,2,10);
+
+-- Test partition constraint violation when intermediate ancestor is used and
+-- constraint is inherited from upper root.
+update sub_parted set a = 2 where c = 10;
+
+-- UPDATE which does not modify partition key of partitions that are chosen for update.
+select tableoid::regclass , * from list_parted where a = 2 order by 1;
+update list_parted set b = c + a where a = 2;
+select tableoid::regclass , * from list_parted where a = 2 order by 1;
+
+
+-----------
+-- Triggers should not be allowed to initiate the update row movement
+-----------
+create function func_parted_mod_b() returns trigger as $$
+begin
+   NEW.b = 2; -- THis is changing partition key column.
+   return NEW;
+end $$ language plpgsql;
+create trigger parted_mod_b before update on sub_part1
+   for each row execute procedure func_parted_mod_b();
+
+select tableoid::regclass , * from list_parted order by 1, 2, 3, 4;
+
+-- This should fail because trigger on sub_part1 would change column 'b' which
+-- would violate "b in (1)" constraint.
+update list_parted set c = 70 where b  = 1 ;
+drop trigger parted_mod_b ON sub_part1 ;
+-- Now that the trigger is dropped, the same update should succeed
+update list_parted set c = 70 where b  = 1 ;
+select tableoid::regclass , * from list_parted order by 1, 2, 3, 4;
+
+drop function func_parted_mod_b ( ) ;
+drop table list_parted;
