From 8d3aa99269334bff8b216086c09879e09232b40b Mon Sep 17 00:00:00 2001
From: Amul Sul <sulamul@gmail.com>
Date: Wed, 13 Sep 2017 15:30:09 +0530
Subject: [PATCH 2/2] invalidate_ctid-ip_blkid WIP_2

Set ctid.ip_blkid to InvalidBlockNumber while moving tuple to the
another partition.
---
 src/backend/access/heap/heapam.c       | 15 ++++++++++++---
 src/backend/access/heap/rewriteheap.c  |  3 ++-
 src/backend/commands/trigger.c         |  5 +++++
 src/backend/executor/execMain.c        |  4 ++++
 src/backend/executor/nodeLockRows.c    |  5 +++++
 src/backend/executor/nodeModifyTable.c | 21 +++++++++++++++++----
 src/include/access/heapam.h            |  2 +-
 7 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index d20f038..5764980 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -2281,7 +2281,8 @@ heap_get_latest_tid(Relation relation,
 		 */
 		if ((tp.t_data->t_infomask & HEAP_XMAX_INVALID) ||
 			HeapTupleHeaderIsOnlyLocked(tp.t_data) ||
-			ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid))
+			ItemPointerEquals(&tp.t_self, &tp.t_data->t_ctid) ||
+			!BlockNumberIsValid(BlockIdGetBlockNumber(&(tp.t_data->t_ctid).ip_blkid)))
 		{
 			UnlockReleaseBuffer(buffer);
 			break;
@@ -3027,7 +3028,7 @@ xmax_infomask_changed(uint16 new_infomask, uint16 old_infomask)
 HTSU_Result
 heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			HeapUpdateFailureData *hufd)
+			HeapUpdateFailureData *hufd, bool row_moved)
 {
 	HTSU_Result result;
 	TransactionId xid = GetCurrentTransactionId();
@@ -3295,6 +3296,13 @@ l1:
 	/* Make sure there is no forward chain link in t_ctid */
 	tp.t_data->t_ctid = tp.t_self;
 
+	/*
+	 * Sets a block identifier to the InvalidBlockNumber to indicate such an
+	 * update being moved tuple to an another partition.
+	 */
+	if (row_moved)
+		BlockIdSet(&((tp.t_data->t_ctid).ip_blkid), InvalidBlockNumber);
+
 	MarkBufferDirty(buffer);
 
 	/*
@@ -3420,7 +3428,7 @@ simple_heap_delete(Relation relation, ItemPointer tid)
 	result = heap_delete(relation, tid,
 						 GetCurrentCommandId(true), InvalidSnapshot,
 						 true /* wait for commit */ ,
-						 &hufd);
+						 &hufd, false);
 	switch (result)
 	{
 		case HeapTupleSelfUpdated:
@@ -5922,6 +5930,7 @@ next:
 		/* if we find the end of update chain, we're done. */
 		if (mytup.t_data->t_infomask & HEAP_XMAX_INVALID ||
 			ItemPointerEquals(&mytup.t_self, &mytup.t_data->t_ctid) ||
+			!BlockNumberIsValid(BlockIdGetBlockNumber(&(mytup.t_data->t_ctid).ip_blkid)) ||
 			HeapTupleHeaderIsOnlyLocked(mytup.t_data))
 		{
 			result = HeapTupleMayBeUpdated;
diff --git a/src/backend/access/heap/rewriteheap.c b/src/backend/access/heap/rewriteheap.c
index bd560e4..ebdc081 100644
--- a/src/backend/access/heap/rewriteheap.c
+++ b/src/backend/access/heap/rewriteheap.c
@@ -422,7 +422,8 @@ rewrite_heap_tuple(RewriteState state,
 	if (!((old_tuple->t_data->t_infomask & HEAP_XMAX_INVALID) ||
 		  HeapTupleHeaderIsOnlyLocked(old_tuple->t_data)) &&
 		!(ItemPointerEquals(&(old_tuple->t_self),
-							&(old_tuple->t_data->t_ctid))))
+							&(old_tuple->t_data->t_ctid))) &&
+		BlockNumberIsValid(BlockIdGetBlockNumber(&(old_tuple->t_data->t_ctid).ip_blkid)))
 	{
 		OldToNewMapping mapping;
 
diff --git a/src/backend/commands/trigger.c b/src/backend/commands/trigger.c
index f9ea29f..b01a961 100644
--- a/src/backend/commands/trigger.c
+++ b/src/backend/commands/trigger.c
@@ -3111,6 +3111,11 @@ ltrmark:;
 					ereport(ERROR,
 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 							 errmsg("could not serialize access due to concurrent update")));
+				if (!BlockNumberIsValid(BlockIdGetBlockNumber(&((hufd.ctid).ip_blkid))))
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("tuple to be updated was already moved to an another partition due to concurrent update")));
+
 				if (!ItemPointerEquals(&hufd.ctid, &tuple.t_self))
 				{
 					/* it was updated, so look at the updated version */
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 1508f72..35d172e 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -2704,6 +2704,10 @@ EvalPlanQualFetch(EState *estate, Relation relation, int lockmode,
 						ereport(ERROR,
 								(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 								 errmsg("could not serialize access due to concurrent update")));
+					if (!BlockNumberIsValid(BlockIdGetBlockNumber(&((hufd.ctid).ip_blkid))))
+						ereport(ERROR,
+								(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+								 errmsg("tuple to be updated was already moved to an another partition due to concurrent update")));
 
 					/* Should not encounter speculative tuple on recheck */
 					Assert(!HeapTupleHeaderIsSpeculative(tuple.t_data));
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index 9389560..1b388e6 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -218,6 +218,11 @@ lnext:
 					ereport(ERROR,
 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 							 errmsg("could not serialize access due to concurrent update")));
+				if (!BlockNumberIsValid(BlockIdGetBlockNumber(&((hufd.ctid).ip_blkid))))
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("tuple to be locked was already moved to an another partition due to concurrent update")));
+
 				if (ItemPointerEquals(&hufd.ctid, &tuple.t_self))
 				{
 					/* Tuple was deleted, so don't return it */
diff --git a/src/backend/executor/nodeModifyTable.c b/src/backend/executor/nodeModifyTable.c
index 400612b..7114734 100644
--- a/src/backend/executor/nodeModifyTable.c
+++ b/src/backend/executor/nodeModifyTable.c
@@ -740,7 +740,8 @@ ExecDelete(ModifyTableState *mtstate,
 		   EState *estate,
 		   bool *delete_skipped,
 		   bool process_returning,
-		   bool canSetTag)
+		   bool canSetTag,
+		   bool row_moved)
 {
 	ResultRelInfo *resultRelInfo;
 	Relation	resultRelationDesc;
@@ -830,7 +831,8 @@ ldelete:;
 							 estate->es_output_cid,
 							 estate->es_crosscheck_snapshot,
 							 true /* wait for commit */ ,
-							 &hufd);
+							 &hufd,
+							 row_moved);
 		switch (result)
 		{
 			case HeapTupleSelfUpdated:
@@ -876,6 +878,11 @@ ldelete:;
 					ereport(ERROR,
 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 							 errmsg("could not serialize access due to concurrent update")));
+				if (!BlockNumberIsValid(BlockIdGetBlockNumber(&((hufd.ctid).ip_blkid))))
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("tuple to be updated was already moved to an another partition due to concurrent update")));
+
 				if (!ItemPointerEquals(tupleid, &hufd.ctid))
 				{
 					TupleTableSlot *epqslot;
@@ -1151,7 +1158,7 @@ lreplace:;
 			 * from INSERT.
 			 */
 			ExecDelete(mtstate, tupleid, oldtuple, planSlot, epqstate, estate,
-					   &delete_skipped, false, false);
+					   &delete_skipped, false, false, true);
 
 			/*
 			 * For some reason if DELETE didn't happen (for e.g. trigger
@@ -1262,6 +1269,11 @@ lreplace:;
 					ereport(ERROR,
 							(errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
 							 errmsg("could not serialize access due to concurrent update")));
+				if (!BlockNumberIsValid(BlockIdGetBlockNumber(&((hufd.ctid).ip_blkid))))
+					ereport(ERROR,
+							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+							 errmsg("tuple to be updated was already moved to an another partition due to concurrent update")));
+
 				if (!ItemPointerEquals(tupleid, &hufd.ctid))
 				{
 					TupleTableSlot *epqslot;
@@ -1281,6 +1293,7 @@ lreplace:;
 						goto lreplace;
 					}
 				}
+
 				/* tuple already deleted; nothing to do */
 				return NULL;
 
@@ -2000,7 +2013,7 @@ ExecModifyTable(PlanState *pstate)
 			case CMD_DELETE:
 				slot = ExecDelete(node, tupleid, oldtuple, planSlot,
 								  &node->mt_epqstate, estate,
-								  NULL, true, node->canSetTag);
+								  NULL, true, node->canSetTag, false);
 				break;
 			default:
 				elog(ERROR, "unknown operation");
diff --git a/src/include/access/heapam.h b/src/include/access/heapam.h
index 4e41024..76f56cf 100644
--- a/src/include/access/heapam.h
+++ b/src/include/access/heapam.h
@@ -156,7 +156,7 @@ extern void heap_multi_insert(Relation relation, HeapTuple *tuples, int ntuples,
 				  CommandId cid, int options, BulkInsertState bistate);
 extern HTSU_Result heap_delete(Relation relation, ItemPointer tid,
 			CommandId cid, Snapshot crosscheck, bool wait,
-			HeapUpdateFailureData *hufd);
+			HeapUpdateFailureData *hufd, bool row_moved);
 extern void heap_finish_speculative(Relation relation, HeapTuple tuple);
 extern void heap_abort_speculative(Relation relation, HeapTuple tuple);
 extern HTSU_Result heap_update(Relation relation, ItemPointer otid,
-- 
2.6.2

