From 1d26203d07f50984ddae4a8928c9006c1da01f8b Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 18 Apr 2019 16:09:25 +0530
Subject: [PATCH 2/2] Provide interfaces to store and fetch undo records.

Add the capability to form undo records and store them in undo logs.  We
also provide the capability to fetch the undo records.  This layer will use
undo-log-storage to reserve the space for the undo records and buffer
management routines to write and read the undo records.

Undo records are stored in sequential order in the undo log.  Each undo
record consists of a variable length header, tuple data, and payload
information.  The undo records are stored without any sort of alignment
padding and a undo record can span across multiple pages.  The undo records
for a transaction can span across multiple undo logs.

Author: Dilip Kumar with contributions from Robert Haas, Amit Kapila,
        Thomas Munro and Rafia Sabih
Reviewed-by: Earlier version of this patch is reviewed by Amit Kapila
Tested-by: Neha Sharma
Discussion: https://www.postgresql.org/message-id/CAFiTN-uVxxopn0UZ64%3DF-sydbETBbGjWapnBikNo1%3DXv78UeFw%40mail.gmail.com
---
 src/backend/access/transam/xact.c    |   37 +
 src/backend/access/undo/Makefile     |    2 +-
 src/backend/access/undo/undoinsert.c | 1257 ++++++++++++++++++++++++++++++++++
 src/backend/access/undo/undorecord.c |  494 +++++++++++++
 src/include/access/transam.h         |    1 +
 src/include/access/undoinsert.h      |   54 ++
 src/include/access/undorecord.h      |  201 ++++++
 src/include/access/xact.h            |    2 +
 8 files changed, 2047 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/access/undo/undoinsert.c
 create mode 100644 src/backend/access/undo/undorecord.c
 create mode 100644 src/include/access/undoinsert.h
 create mode 100644 src/include/access/undorecord.h

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index bd5024e..6747095 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -30,6 +30,7 @@
 #include "access/xlog.h"
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
+#include "access/undoinsert.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
@@ -68,6 +69,7 @@
 #include "utils/timestamp.h"
 #include "pg_trace.h"
 
+#define	AtAbort_ResetUndoBuffers() ResetUndoBuffers()
 
 /*
  *	User-tweakable parameters
@@ -192,6 +194,10 @@ typedef struct TransactionStateData
 	bool		didLogXid;		/* has xid been included in WAL record? */
 	int			parallelModeLevel;	/* Enter/ExitParallelMode counter */
 	bool		chain;			/* start a new block after this one */
+
+	 /* start and end undo record location for each persistence level */
+	UndoRecPtr	start_urec_ptr[UndoPersistenceLevels];
+	UndoRecPtr	latest_urec_ptr[UndoPersistenceLevels];
 	struct TransactionStateData *parent;	/* back link to parent */
 } TransactionStateData;
 
@@ -998,6 +1004,35 @@ IsInParallelMode(void)
 }
 
 /*
+ * SetCurrentUndoLocation
+ *
+ * Update the start and the latest undo record pointer for the transaction.
+ *
+ * start_urec_ptr is set only for the first undo for the transaction i.e.
+ * start_urec_ptr is invalid.  Update the latest_urec_ptr whenever a new
+ * undo is inserted for the transaction.
+ *
+ * start and latest undo record pointer are tracked separately for each
+ * persistent level.
+ */
+void
+SetCurrentUndoLocation(UndoRecPtr urec_ptr)
+{
+	UndoLogControl *log = UndoLogGet(UndoRecPtrGetLogNo(urec_ptr), false);
+	UndoPersistence upersistence = log->meta.persistence;
+
+	Assert(AmAttachedToUndoLog(log) || InRecovery);
+	/*
+	 * Set the start undo record pointer for first undo record in a
+	 * subtransaction.
+	 */
+	if (!UndoRecPtrIsValid(CurrentTransactionState->start_urec_ptr[upersistence]))
+		CurrentTransactionState->start_urec_ptr[upersistence] = urec_ptr;
+	CurrentTransactionState->latest_urec_ptr[upersistence] = urec_ptr;
+
+}
+
+/*
  *	CommandCounterIncrement
  */
 void
@@ -2736,6 +2771,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false);
 		AtEOXact_ApplyLauncher(false);
+		AtAbort_ResetUndoBuffers();
 		pgstat_report_xact_timestamp(0);
 	}
 
@@ -4993,6 +5029,7 @@ AbortSubTransaction(void)
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
 		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
+		AtAbort_ResetUndoBuffers();
 	}
 
 	/*
diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile
index 219c696..f41e8f7 100644
--- a/src/backend/access/undo/Makefile
+++ b/src/backend/access/undo/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undolog.o
+OBJS = undoinsert.o undolog.o undorecord.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/undo/undoinsert.c b/src/backend/access/undo/undoinsert.c
new file mode 100644
index 0000000..27cb5a7
--- /dev/null
+++ b/src/backend/access/undo/undoinsert.c
@@ -0,0 +1,1257 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoinsert.c
+ *	  entry points for inserting undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undoinsert.c
+ *
+ * NOTES:
+ * Undo record layout:
+ *
+ * Undo records are stored in sequential order in the undo log.  Each undo
+ * record consists of a variable length header, tuple data, and payload
+ * information.  The first undo record of each transaction contains a
+ * transaction header that points to the next transaction's start header.
+ * This allows us to discard the entire transaction's log at one-shot rather
+ * than record-by-record.  The callers are not aware of transaction header,
+ * this is entirely maintained and used by undo record layer.   See
+ * undorecord.h for detailed information about undo record header.
+ *
+ * Multiple logs:
+ *
+ * It is possible that the undo records for a transaction spans across
+ * multiple undo logs.  We need some special handling while inserting them to
+ * ensure that discard and rollbacks can work sanely.
+ *
+ * When the undorecord for a transaction gets inserted in the next log then we
+ * insert a transaction header for the first record in the new log and update
+ * the transaction header with this new logs location.  We will also keep
+ * a back pointer to the last undo record of previous log in the first record
+ * of new log, so that we can traverse the previous record during rollback.
+ * Incase, this is not the first record in new log (aka new log already
+ * contains some other transactions data), we also update that transactions
+ * next start header with this new undo records location.  This will allow us
+ * to connect transaction's undo records across logs when the same transaction
+ * span across log.
+ *
+ * There is some difference in the way the rollbacks work when the undo for
+ * same transaction spans across multiple logs depending on which log is
+ * processed first by the discard worker.  If it processes the first log which
+ * contains the transactions first record, then it can get the last record
+ * of that transaction even if it is in different log and then processes all
+ * the undo records from last to first.  OTOH, if the next log get processed
+ * first, we don't need to trace back the actual start pointer of the
+ * transaction, rather we only execute the undo actions from the current log
+ * and avoid re-executing them next time.  There is a possibility that after
+ * executing the undo actions, the undo got discarded, now in later stage while
+ * processing the previous log, it might try to fetch the undo record in the
+ * discarded log while chasing the transaction header chain which can cause
+ * trouble.  We avoid this situation by first checking if the next_urec of
+ * the transaction is already discarded and if so, we start executing from
+ * the last undo record in the current log.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/undorecord.h"
+#include "access/undoinsert.h"
+#include "access/undolog_xlog.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "catalog/pg_tablespace.h"
+#include "commands/tablecmds.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/buf_internals.h"
+#include "storage/bufmgr.h"
+#include "miscadmin.h"
+
+/*
+ * XXX Do we want to support undo tuple size which is more than the BLCKSZ
+ * if not than undo record can spread across 2 buffers at the max.
+ */
+#define MAX_BUFFER_PER_UNDO    2
+
+/*
+ * This defines the number of undo records that can be prepared before
+ * calling insert by default.  If you need to prepare more than
+ * MAX_PREPARED_UNDO undo records, then you must call UndoSetPrepareSize
+ * first.
+ */
+#define MAX_PREPARED_UNDO 2
+
+/*
+ * This defines the max number of previous xact infos we need to update.
+ * Usually it's 1 for updating next link of previous transaction's header
+ * if we are starting a new transaction.  But, in some cases where the same
+ * transaction is spilled to the next log, we update our own transaction's
+ * header in previous undo log as well as the header of the previous
+ * transaction in the new log.
+ */
+#define MAX_XACT_UNDO_INFO	2
+
+/*
+ * Consider buffers needed for updating previous transaction's
+ * starting undo record as well.
+ */
+#define MAX_UNDO_BUFFERS       (MAX_PREPARED_UNDO + MAX_XACT_UNDO_INFO) * MAX_BUFFER_PER_UNDO
+
+/* Undo block number to buffer mapping. */
+typedef struct UndoBuffers
+{
+	UndoLogNumber logno;		/* Undo log number */
+	BlockNumber blk;			/* block number */
+	Buffer		buf;			/* buffer allocated for the block */
+	bool		zero;			/* new block full of zeroes */
+} UndoBuffers;
+
+static UndoBuffers def_buffers[MAX_UNDO_BUFFERS];
+static int	buffer_idx;
+
+/*
+ * Structure to hold the prepared undo information.
+ */
+typedef struct PreparedUndoSpace
+{
+	UndoRecPtr	urp;			/* undo record pointer */
+	UnpackedUndoRecord *urec;	/* undo record */
+	uint16		size;			/* undo record size */
+	int			undo_buffer_idx[MAX_BUFFER_PER_UNDO];	/* undo_buffer array
+														 * index */
+} PreparedUndoSpace;
+
+static PreparedUndoSpace def_prepared[MAX_PREPARED_UNDO];
+static int	prepare_idx;
+static int	max_prepared_undo = MAX_PREPARED_UNDO;
+static UndoRecPtr prepared_urec_ptr = InvalidUndoRecPtr;
+
+/*
+ * By default prepared_undo and undo_buffer points to the static memory.
+ * In case caller wants to support more than default max_prepared undo records
+ * then the limit can be increased by calling UndoSetPrepareSize function.
+ * Therein, dynamic memory will be allocated and prepared_undo and undo_buffer
+ * will start pointing to newly allocated memory, which will be released by
+ * UnlockReleaseUndoBuffers and these variables will again set back to their
+ * default values.
+ */
+static PreparedUndoSpace *prepared_undo = def_prepared;
+static UndoBuffers *undo_buffer = def_buffers;
+
+/*
+ * Structure to hold the previous transaction's undo update information.  This
+ * is populated while current transaction is updating its undo record pointer
+ * in previous transactions first undo record.
+ */
+typedef struct XactUndoRecordInfo
+{
+	UndoRecPtr	urecptr;		/* txn's start urecptr */
+	int			idx_undo_buffers[MAX_BUFFER_PER_UNDO];
+	UnpackedUndoRecord uur;		/* undo record header */
+} XactUndoRecordInfo;
+
+static XactUndoRecordInfo xact_urec_info[MAX_XACT_UNDO_INFO];
+static int	xact_urec_info_idx;
+
+/* Prototypes for static functions. */
+UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec,
+				 UndoRecPtr urp, RelFileNode rnode,
+				 UndoPersistence persistence);
+static void UndoRecordPrepareTransInfo(UndoRecPtr urecptr,
+						   UndoRecPtr xact_urp,
+						   XLogReaderState *xlog_record);
+static void UndoRecordUpdateTransInfo(int idx);
+static int UndoGetBufferSlot(RelFileNode rnode, BlockNumber blk,
+				  ReadBufferMode rbm,
+				  UndoPersistence persistence, XLogReaderState *xlog_record);
+static bool UndoRecordIsValid(UndoLogControl * log,
+				  UndoRecPtr urp);
+static uint16 UndoGetPrevRecordLen(UndoRecPtr urp,
+								   UndoPersistence upersistence);
+/*
+ * Check whether the undo record is discarded or not.  If it's already discarded
+ * return false otherwise return true.
+ *
+ * Caller must hold lock on log->discard_lock.  This function will release the
+ * lock if return false otherwise lock will be held on return and the caller
+ * need to release it.
+ */
+static bool
+UndoRecordIsValid(UndoLogControl * log, UndoRecPtr urp)
+{
+	Assert(LWLockHeldByMeInMode(&log->discard_lock, LW_SHARED));
+
+	if (log->oldest_data == InvalidUndoRecPtr)
+	{
+		/*
+		 * oldest_data is only initialized when the DiscardWorker first time
+		 * attempts to discard undo logs so we can not rely on this value to
+		 * identify whether the undo record pointer is already discarded or
+		 * not so we can check it by calling undo log routine.  If its not yet
+		 * discarded then we have to reacquire the log->discard_lock so that
+		 * the doesn't get discarded concurrently.
+		 */
+		LWLockRelease(&log->discard_lock);
+		if (UndoLogIsDiscarded(urp))
+			return false;
+		LWLockAcquire(&log->discard_lock, LW_SHARED);
+	}
+
+	/* Check again if it's already discarded. */
+	if (urp < log->oldest_data)
+	{
+		LWLockRelease(&log->discard_lock);
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Prepare to update the previous transaction's next undo pointer to maintain
+ * the transaction chain in the undo.  This will read the header of the first
+ * undo record of the previous transaction and lock the necessary buffers.
+ * The actual update will be done by UndoRecordUpdateTransInfo under the
+ * critical section.
+ */
+static void
+UndoRecordPrepareTransInfo(UndoRecPtr urecptr, UndoRecPtr xact_urp,
+						   XLogReaderState *xlog_record)
+{
+	Buffer		buffer = InvalidBuffer;
+	BlockNumber cur_blk;
+	RelFileNode rnode;
+	UndoLogControl *log;
+	Page		page;
+	int			already_decoded = 0;
+	int			starting_byte;
+	int			bufidx;
+	int			index = 0;
+
+	/*
+	 * The absence of previous transaction's undo indicate that this backend
+	 * is preparing its first undo in which case we have nothing to update.
+	 */
+	if (!UndoRecPtrIsValid(xact_urp))
+		return;
+
+	log = UndoLogGet(UndoRecPtrGetLogNo(xact_urp), false);
+
+	/*
+	 * Temporary undo logs are discarded on transaction commit so we don't
+	 * need to do anything.
+	 */
+	if (log->meta.persistence == UNDO_TEMP)
+		return;
+
+	/*
+	 * Acquire the discard lock before accessing the undo record so that
+	 * discard worker doesn't remove the record while we are in process of
+	 * reading it.
+	 */
+	LWLockAcquire(&log->discard_lock, LW_SHARED);
+
+	/*
+	 * The absence of previous transaction's undo indicate that this backend
+	 * is preparing its first undo in which case we have nothing to update.
+	 * UndoRecordIsValid will release the lock if it returns false.
+	 */
+	if (!UndoRecordIsValid(log, xact_urp))
+		return;
+
+	UndoRecPtrAssignRelFileNode(rnode, xact_urp);
+	cur_blk = UndoRecPtrGetBlockNum(xact_urp);
+	starting_byte = UndoRecPtrGetPageOffset(xact_urp);
+
+	/*
+	 * Read undo record header in by calling UnpackUndoRecord, if the undo
+	 * record header is split across buffers then we need to read the complete
+	 * header by invoking UnpackUndoRecord multiple times.
+	 */
+	while (true)
+	{
+		bufidx = UndoGetBufferSlot(rnode, cur_blk,
+								   RBM_NORMAL,
+								   log->meta.persistence, xlog_record);
+		xact_urec_info[xact_urec_info_idx].idx_undo_buffers[index++] = bufidx;
+		buffer = undo_buffer[bufidx].buf;
+		page = BufferGetPage(buffer);
+
+		if (UnpackUndoRecord(&xact_urec_info[xact_urec_info_idx].uur, page,
+							 starting_byte, &already_decoded, true))
+			break;
+
+		/* Could not fetch the complete header so go to the next block. */
+		starting_byte = UndoLogBlockHeaderSize;
+		cur_blk++;
+	}
+
+	xact_urec_info[xact_urec_info_idx].uur.uur_next = urecptr;
+	xact_urec_info[xact_urec_info_idx].urecptr = xact_urp;
+	xact_urec_info_idx++;
+	LWLockRelease(&log->discard_lock);
+}
+
+
+/*
+ * Overwrite the first undo record of the previous transaction to update its
+ * next pointer.  This will just insert the already prepared record by
+ * UndoRecordPrepareTransInfo.  This must be called under the critical section.
+ * This will just overwrite the undo header not the data.
+ */
+static void
+UndoRecordUpdateTransInfo(int idx)
+{
+	UndoLogNumber logno = UndoRecPtrGetLogNo(xact_urec_info[idx].urecptr);
+	Page		page = NULL;
+	int			starting_byte;
+	int			already_written = 0;
+	int			i = 0;
+	uint16		remaining_bytes;
+	UndoRecPtr	urec_ptr = InvalidUndoRecPtr;
+	UndoLogControl *log;
+
+	log = UndoLogGet(logno, false);
+	urec_ptr = xact_urec_info[idx].urecptr;
+
+	/*
+	 * Acquire the discard lock before accessing the undo record so that
+	 * discard worker can't remove the record while we are in process of
+	 * reading it.
+	 */
+	LWLockAcquire(&log->discard_lock, LW_SHARED);
+
+	if (!UndoRecordIsValid(log, urec_ptr))
+		return;
+
+	/*
+	 * Update the next transactions start urecptr in the transaction header.
+	 */
+	starting_byte = UndoRecPtrGetPageOffset(urec_ptr);
+	remaining_bytes = sizeof(UndoRecPtr);
+	do
+	{
+		Buffer		buffer;
+		int			buf_idx;
+
+		buf_idx = xact_urec_info[idx].idx_undo_buffers[i];
+		buffer = undo_buffer[buf_idx].buf;
+
+		if (BufferIsValid(buffer))
+		{
+			page = BufferGetPage(buffer);
+			/* Overwrite the previously written undo. */
+			if (InsertUndoRecord(&xact_urec_info[idx].uur, page, starting_byte,
+				&already_written, 0, 0, true))
+			{
+				MarkBufferDirty(buffer);
+				break;
+			}
+			MarkBufferDirty(buffer);
+		}
+		else
+		{
+			/*
+			 * During recovery, there might be some blocks which are already
+			 * removed by discard process, so we can just skip inserting into
+			 * those blocks.
+			 */
+			Assert(InRecovery);
+
+			/*
+			 * Block is not valid so we can not write to the current block
+			 * but we might need to insert remaining partial record to the
+			 * next block so set proper value for already_written variable
+			 * to jump to the undo record offset from which we want to
+			 * insert into next block.
+			 */
+			if (InsertUndoRecord(&xact_urec_info[idx].uur, page, starting_byte,
+				&already_written, remaining_bytes, 0, true))
+				break;
+			else
+				remaining_bytes -= (BLCKSZ - starting_byte);
+		}
+		starting_byte = UndoLogBlockHeaderSize;
+		i++;
+
+		Assert(idx < MAX_BUFFER_PER_UNDO);
+	} while (true);
+
+	LWLockRelease(&log->discard_lock);
+}
+
+/*
+ * Find the block number in undo buffer array, if it's present then just return
+ * its index otherwise search the buffer and insert an entry and lock the buffer
+ * in exclusive mode.
+ *
+ * Undo log insertions are append-only.  If the caller is writing new data
+ * that begins exactly at the beginning of a page, then there cannot be any
+ * useful data after that point.  In that case RBM_ZERO can be passed in as
+ * rbm so that we can skip a useless read of a disk block.  In all other
+ * cases, RBM_NORMAL should be passed in, to read the page in if it doesn't
+ * happen to be already in the buffer pool.
+ */
+static int
+UndoGetBufferSlot(RelFileNode rnode,
+				  BlockNumber blk,
+				  ReadBufferMode rbm,
+				  UndoPersistence persistence,
+				  XLogReaderState *xlog_record)
+{
+	int			i;
+	Buffer		buffer;
+	XLogRedoAction 	action = BLK_NEEDS_REDO;
+
+	/* Don't do anything, if we already have a buffer pinned for the block. */
+	for (i = 0; i < buffer_idx; i++)
+	{
+		/*
+		 * It's not enough to just compare the block number because the
+		 * undo_buffer might holds the undo from different undo logs (e.g when
+		 * previous transaction start header is in previous undo log) so
+		 * compare (logno + blkno).
+		 */
+		if ((blk == undo_buffer[i].blk) &&
+			(undo_buffer[i].logno == rnode.relNode))
+		{
+			/* caller must hold exclusive lock on buffer */
+			Assert(BufferIsLocal(undo_buffer[i].buf) ||
+				   LWLockHeldByMeInMode(BufferDescriptorGetContentLock(
+																	   GetBufferDescriptor(undo_buffer[i].buf - 1)),
+										LW_EXCLUSIVE));
+			break;
+		}
+	}
+
+	/*
+	 * We did not find the block so allocate the buffer and insert into the
+	 * undo buffer array
+	 */
+	if (i == buffer_idx)
+	{
+		/*
+		 * Fetch the buffer in which we want to insert the undo record.
+		 */
+		if (InRecovery)
+			action = XLogReadBufferForRedoBlock(xlog_record,
+									   SMGR_UNDO,
+									   rnode,
+									   UndoLogForkNum,
+									   blk,
+									   rbm,
+									   false,
+									   &buffer);
+		else
+		{
+			buffer = ReadBufferWithoutRelcache(SMGR_UNDO,
+											   rnode,
+											   UndoLogForkNum,
+											   blk,
+											   rbm,
+											   NULL,
+											   RelPersistenceForUndoPersistence(persistence));
+
+			/* Lock the buffer */
+			LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+		}
+
+		if (action == BLK_NOTFOUND)
+		{
+			undo_buffer[buffer_idx].buf = InvalidBuffer;
+			undo_buffer[buffer_idx].blk = InvalidBlockNumber;
+		}
+		else
+		{
+			undo_buffer[buffer_idx].buf = buffer;
+			undo_buffer[buffer_idx].blk = blk;
+			undo_buffer[buffer_idx].logno = rnode.relNode;
+			undo_buffer[buffer_idx].zero = rbm == RBM_ZERO;
+		}
+		buffer_idx++;
+	}
+
+	return i;
+}
+
+/*
+ * Call UndoSetPrepareSize to set the value of how many undo records can be
+ * prepared before we can insert them.  If the size is greater than
+ * MAX_PREPARED_UNDO then it will allocate extra memory to hold the extra
+ * prepared undo.
+ *
+ * This is normally used when more than one undo record needs to be prepared.
+ */
+void
+UndoSetPrepareSize(int nrecords)
+{
+	if (nrecords <= MAX_PREPARED_UNDO)
+		return;
+
+	prepared_undo = palloc0(nrecords * sizeof(PreparedUndoSpace));
+
+	/*
+	 * Consider buffers needed for updating previous transaction's starting
+	 * undo record. Hence increased by 1.
+	 */
+	undo_buffer = palloc0((nrecords + 1) * MAX_BUFFER_PER_UNDO *
+						  sizeof(UndoBuffers));
+	max_prepared_undo = nrecords;
+}
+
+/*
+ * Call PrepareUndoInsert to tell the undo subsystem about the undo record you
+ * intended to insert.  Upon return, the necessary undo buffers are pinned and
+ * locked.
+ *
+ * This should be done before any critical section is established, since it
+ * can fail.
+ *
+ * In recovery, 'xid' refers to the transaction id stored in WAL, otherwise,
+ * it refers to the top transaction id because undo log only stores mapping
+ * for the top most transactions.
+ */
+UndoRecPtr
+PrepareUndoInsert(UnpackedUndoRecord *urec, FullTransactionId fxid,
+				  UndoPersistence upersistence,
+				  XLogReaderState *xlog_record)
+{
+	UndoRecordSize size;
+	UndoRecPtr	urecptr;
+	RelFileNode rnode;
+	UndoRecordSize cur_size = 0;
+	BlockNumber cur_blk;
+	FullTransactionId txid;
+	int			starting_byte;
+	int			index = 0;
+	int			bufidx;
+	ReadBufferMode rbm;
+	bool		need_xact_header;
+	UndoRecPtr	try_location;
+	UndoRecPtr	last_xact_start;
+	UndoRecPtr	prevlog_xact_start = InvalidUndoRecPtr;
+	UndoRecPtr	prevlog_insert_urp = InvalidUndoRecPtr;
+	UndoRecPtr	prevlogurp = InvalidUndoRecPtr;
+
+	/* Already reached maximum prepared limit. */
+	if (prepare_idx == max_prepared_undo)
+		elog(ERROR, "already reached the maximum prepared limit");
+
+	if (!FullTransactionIdIsValid(fxid))
+	{
+		/* During recovery, we must have a valid transaction id. */
+		Assert(!InRecovery);
+		txid = GetTopFullTransactionId();
+	}
+	else
+	{
+		/*
+		 * Assign the top transaction id because undo log only stores mapping
+		 * for the top most transactions.
+		 */
+		Assert(InRecovery ||
+			   FullTransactionIdEquals(fxid, GetTopFullTransactionId()));
+		txid = fxid;
+	}
+
+	/*
+	 * We don't yet know if this record needs a transaction header (ie is the
+	 * first undo record for a given transaction in a given undo log), because
+	 * you can only find out by allocating.  We'll resolve this circularity by
+	 * allocating enough space for a transaction header.  We'll only advance
+	 * by as many bytes as we turn out to need.
+	 */
+	urec->uur_next = InvalidUndoRecPtr;
+	urec->uur_xidepoch = EpochFromFullTransactionId(fxid);
+	UndoRecordSetInfo(urec);
+	urec->uur_info |= UREC_INFO_TRANSACTION;
+	size = UndoRecordExpectedSize(urec);
+
+	/*
+	 * Since we don't actually advance the insert pointer until later in
+	 * InsertPreparedUndo(), but we may need to allocate space for several
+	 * undo records, we need to keep track of the insert pointer as we go.
+	 */
+	if (prepare_idx == 0)
+	{
+		/* Nothing allocated already; just ask for some space anywhere. */
+		try_location = InvalidUndoRecPtr;
+	}
+	else
+	{
+		/*
+		 * Ask to extend the space immediately after the last record, if
+		 * possible.  A new undo log will be chosen otherwise.
+		 */
+		PreparedUndoSpace *space = &prepared_undo[prepare_idx - 1];
+
+		try_location = UndoLogOffsetPlusUsableBytes(space->urp, space->size);
+	}
+
+	/* Allocate space for the record. */
+	if (InRecovery)
+	{
+		/*
+		 * We'll figure out where the space needs to be allocated by
+		 * inspecting the xlog_record.
+		 */
+		Assert(upersistence == UNDO_PERMANENT);
+		urecptr = UndoLogAllocateInRecovery(XidFromFullTransactionId(txid),
+											size, try_location,
+											&need_xact_header,
+											&last_xact_start,
+											&prevlog_xact_start,
+											&prevlogurp,
+											xlog_record);
+	}
+	else
+	{
+		urecptr = UndoLogAllocate(size, try_location, upersistence,
+								  &need_xact_header, &last_xact_start,
+								  &prevlog_xact_start, &prevlog_insert_urp);
+		if (UndoRecPtrIsValid(prevlog_xact_start))
+		{
+			uint16	prevlen;
+
+			Assert(UndoRecPtrIsValid(prevlog_insert_urp));
+			/* Fetch length of the last undo record of the previous log. */
+			prevlen = UndoGetPrevRecordLen(prevlog_insert_urp, upersistence);
+			/* Compute the last record's undo record pointer. */
+			prevlogurp =
+				MakeUndoRecPtr(UndoRecPtrGetLogNo(prevlog_insert_urp),
+							   (UndoRecPtrGetOffset(prevlog_insert_urp) - prevlen));
+			/*
+			 * Undo log switched so set prevlog info in current undo log.
+			 *
+			 * XXX can we do this directly in UndoLogAllocate ? but for that
+			 * the UndoLogAllocate might need to read the length of the last
+			 * undo record from the previous undo log but for that it might use
+			 * callback?
+			 */
+			UndoLogSwitchSetPrevLogInfo(UndoRecPtrGetLogNo(urecptr),
+										prevlog_xact_start, prevlogurp);
+		}
+	}
+
+	urec->uur_prevurp = prevlogurp;
+
+	/* Initialize transaction related members. */
+	urec->uur_progress = 0;
+	if (need_xact_header)
+	{
+		/*
+		 * TODO: Should we set urec->uur_dbid automatically?  How can you do
+		 * that, in recovery -- can we extract it from xlog_record?  For now
+		 * assume that the caller set it explicitly.
+		 */
+	}
+	else
+	{
+		urec->uur_dbid = 0;
+
+		/* We don't need a transaction header after all. */
+		urec->uur_info &= ~UREC_INFO_TRANSACTION;
+		size = UndoRecordExpectedSize(urec);
+	}
+
+	/*
+	 * If there is a physically preceding transaction in this undo log, and we
+	 * are writing the first record for this transaction that is in this undo
+	 * log (not necessarily the first ever for the transaction, because we
+	 * could have switched logs), then we need to update the size of the
+	 * preceding transaction.
+	 */
+	if (need_xact_header &&
+		UndoRecPtrGetOffset(urecptr) > UndoLogBlockHeaderSize)
+		UndoRecordPrepareTransInfo(urecptr, last_xact_start, xlog_record);
+	/*
+	 * If prevlog_xact_start is valid that means the transaction's undo are
+	 * split across the undo logs.  So we need to  update our own transaction
+	 * header in the previous log as well.
+	 */
+	if (UndoRecPtrIsValid(prevlog_xact_start))
+	{
+		Assert(UndoRecPtrIsValid(prevlogurp));
+		UndoRecordPrepareTransInfo(urecptr, prevlog_xact_start, xlog_record);
+	}
+
+	cur_blk = UndoRecPtrGetBlockNum(urecptr);
+	UndoRecPtrAssignRelFileNode(rnode, urecptr);
+	starting_byte = UndoRecPtrGetPageOffset(urecptr);
+
+	/*
+	 * If we happen to be writing the very first byte into this page, then
+	 * there is no need to read from disk.
+	 */
+	if (starting_byte == UndoLogBlockHeaderSize)
+		rbm = RBM_ZERO;
+	else
+		rbm = RBM_NORMAL;
+
+	do
+	{
+		bufidx = UndoGetBufferSlot(rnode, cur_blk, rbm, upersistence,
+								   xlog_record);
+		if (cur_size == 0)
+			cur_size = BLCKSZ - starting_byte;
+		else
+			cur_size += BLCKSZ - UndoLogBlockHeaderSize;
+
+		/* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+		Assert(index < MAX_BUFFER_PER_UNDO);
+
+		/* Keep the track of the buffers we have pinned and locked. */
+		prepared_undo[prepare_idx].undo_buffer_idx[index++] = bufidx;
+
+		/*
+		 * If we need more pages they'll be all new so we can definitely skip
+		 * reading from disk.
+		 */
+		rbm = RBM_ZERO;
+		cur_blk++;
+	} while (cur_size < size);
+
+	/*
+	 * Save the undo record information to be later used by InsertPreparedUndo
+	 * to insert the prepared record.
+	 */
+	prepared_undo[prepare_idx].urec = urec;
+	prepared_undo[prepare_idx].urp = urecptr;
+	prepared_undo[prepare_idx].size = size;
+	prepare_idx++;
+
+	return urecptr;
+}
+
+/*
+ * Insert a previously-prepared undo record.  This will write the actual undo
+ * record into the buffers already pinned and locked in PreparedUndoInsert,
+ * and mark them dirty.  This step should be performed after entering a
+ * criticalsection; it should never fail.
+ */
+void
+InsertPreparedUndo(void)
+{
+	Page		page = NULL;
+	int			starting_byte;
+	int			already_written;
+	int			bufidx = 0;
+	int			idx;
+	uint16		undo_len = 0;
+	uint16		remaining_bytes;
+	UndoRecPtr	urp;
+	UnpackedUndoRecord *uur;
+	uint16		size;
+
+	/* There must be atleast one prepared undo record. */
+	Assert(prepare_idx > 0);
+
+	/*
+	 * This must be called under a critical section or we must be in recovery.
+	 */
+	Assert(InRecovery || CritSectionCount > 0);
+
+	for (idx = 0; idx < prepare_idx; idx++)
+	{
+		uur = prepared_undo[idx].urec;
+		urp = prepared_undo[idx].urp;
+		size = prepared_undo[idx].size;
+
+		Assert(size == UndoRecordExpectedSize(uur));
+
+		already_written = 0;
+		bufidx = 0;
+		starting_byte = UndoRecPtrGetPageOffset(urp);
+		undo_len = remaining_bytes = UndoRecordExpectedSize(uur);
+
+		do
+		{
+			PreparedUndoSpace undospace = prepared_undo[idx];
+			Buffer		buffer;
+
+			buffer = undo_buffer[undospace.undo_buffer_idx[bufidx]].buf;
+			if (BufferIsValid(buffer))
+			{
+				page = BufferGetPage(buffer);
+
+				/*
+				 * Initialize the page whenever we try to write the first record
+				 * in page.  We start writting immediately after the block header.
+				 */
+				if (starting_byte == UndoLogBlockHeaderSize)
+					PageInit(page, BLCKSZ, 0);
+
+				/*
+				 * Try to insert the record into the current page. If it doesn't
+				 * succeed then recall the routine with the next page.
+				 */
+				if (InsertUndoRecord(uur, page, starting_byte, &already_written, 0,
+									 undo_len, false))
+				{
+					undo_len += already_written;
+					MarkBufferDirty(buffer);
+					break;
+				}
+				MarkBufferDirty(buffer);
+			}
+			else
+			{
+				/*
+				 * During recovery, there might be some blocks which are already
+				 * deleted due to some discard command so we can just skip
+				 * inserting into those blocks.
+				 */
+				Assert(InRecovery);
+
+				/*
+				 * Block is not valid so we can not write to the current block
+				 * but we might need to insert remaining partial record to the
+				 * next block so set proper value for already_written variable
+				 * to jump to the undo record offset from which we want to
+				 * insert into next block.  InsertUndoRecord will not write
+				 * anything if the input page is NULL, it will just update the
+				 * already_written count and local work header.
+				 */
+				if (InsertUndoRecord(uur, page, starting_byte, &already_written,
+					remaining_bytes, undo_len, false))
+					break;
+				else
+					remaining_bytes -= (BLCKSZ - starting_byte);
+			}
+
+			/* Insert remaining record in next block. */
+			starting_byte = UndoLogBlockHeaderSize;
+			bufidx++;
+
+			/* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+			Assert(bufidx < MAX_BUFFER_PER_UNDO);
+		} while (true);
+
+		/* Advance the insert pointer past this record. */
+		UndoLogAdvance(urp, size);
+
+		/*
+		 * Set the current undo location for a transaction.  This is required
+		 * to perform rollback during abort of transaction.
+		 */
+		SetCurrentUndoLocation(urp);
+	}
+
+	/* Update previously prepared transaction headers. */
+	if (xact_urec_info_idx > 0)
+	{
+		int			i = 0;
+
+		for (i = 0; i < xact_urec_info_idx; i++)
+			UndoRecordUpdateTransInfo(i);
+	}
+}
+
+/*
+ * Helper function for UndoFetchRecord.  It will fetch the undo record pointed
+ * by urp and unpack the record into urec.  This function will not release the
+ * pin on the buffer if complete record is fetched from one buffer, so caller
+ * can reuse the same urec to fetch the another undo record which is on the
+ * same block.  Caller will be responsible to release the buffer inside urec
+ * and set it to invalid if it wishes to fetch the record from another block.
+ */
+UnpackedUndoRecord *
+UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode,
+				 UndoPersistence persistence)
+{
+	Buffer		buffer = urec->uur_buffer;
+	Page		page;
+	int			starting_byte = UndoRecPtrGetPageOffset(urp);
+	int			already_decoded = 0;
+	BlockNumber cur_blk;
+	bool		is_undo_rec_split = false;
+
+	cur_blk = UndoRecPtrGetBlockNum(urp);
+
+	/* If we already have a buffer pin then no need to allocate a new one. */
+	if (!BufferIsValid(buffer))
+	{
+		buffer = ReadBufferWithoutRelcache(SMGR_UNDO,
+										   rnode, UndoLogForkNum, cur_blk,
+										   RBM_NORMAL, NULL,
+										   RelPersistenceForUndoPersistence(persistence));
+
+		urec->uur_buffer = buffer;
+	}
+
+	while (true)
+	{
+		LockBuffer(buffer, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buffer);
+
+		/*
+		 * XXX This can be optimized to just fetch header first and only if
+		 * matches with block number and offset then fetch the complete
+		 * record.
+		 */
+		if (UnpackUndoRecord(urec, page, starting_byte, &already_decoded, false))
+			break;
+
+		starting_byte = UndoLogBlockHeaderSize;
+		is_undo_rec_split = true;
+
+		/*
+		 * The record spans more than a page so we would have copied it (see
+		 * UnpackUndoRecord).  In such cases, we can release the buffer.
+		 */
+		urec->uur_buffer = InvalidBuffer;
+		UnlockReleaseBuffer(buffer);
+
+		/* Go to next block. */
+		cur_blk++;
+		buffer = ReadBufferWithoutRelcache(SMGR_UNDO,
+										   rnode, UndoLogForkNum, cur_blk,
+										   RBM_NORMAL, NULL,
+										   RelPersistenceForUndoPersistence(persistence));
+	}
+
+	/*
+	 * If we have copied the data then release the buffer, otherwise, just
+	 * unlock it.
+	 */
+	if (is_undo_rec_split)
+		UnlockReleaseBuffer(buffer);
+	else
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+	return urec;
+}
+
+/*
+ * ResetUndoRecord - Helper function for UndoFetchRecord to reset the current
+ * record.
+ */
+static void
+ResetUndoRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode *rnode,
+				RelFileNode *prevrec_rnode)
+{
+	/*
+	 * If we have a valid buffer pinned then just ensure that we want to find
+	 * the next tuple from the same block.  Otherwise release the buffer and
+	 * set it invalid
+	 */
+	if (BufferIsValid(urec->uur_buffer))
+	{
+		/*
+		 * Undo buffer will be changed if the next undo record belongs to a
+		 * different block or undo log.
+		 */
+		if ((UndoRecPtrGetBlockNum(urp) !=
+			 BufferGetBlockNumber(urec->uur_buffer)) ||
+			(prevrec_rnode->relNode != rnode->relNode))
+		{
+			ReleaseBuffer(urec->uur_buffer);
+			urec->uur_buffer = InvalidBuffer;
+		}
+	}
+	else
+	{
+		/*
+		 * If there is not a valid buffer in urec->uur_buffer that means we
+		 * had copied the payload data and tuple data so free them.
+		 */
+		if (urec->uur_payload.data)
+			pfree(urec->uur_payload.data);
+		if (urec->uur_tuple.data)
+			pfree(urec->uur_tuple.data);
+	}
+
+	/* Reset the urec before fetching the tuple */
+	urec->uur_tuple.data = NULL;
+	urec->uur_tuple.len = 0;
+	urec->uur_payload.data = NULL;
+	urec->uur_payload.len = 0;
+}
+
+/*
+ * Fetch the next undo record for given blkno, offset and transaction id (if
+ * valid).  The same tuple can be modified by multiple transactions, so during
+ * undo chain traversal sometimes we need to distinguish based on transaction
+ * id.  Callers that don't have any such requirement can pass
+ * InvalidTransactionId.
+ *
+ * Start the search from urp.  Caller need to call UndoRecordRelease to release the
+ * resources allocated by this function.
+ *
+ * urec_ptr_out is undo record pointer of the qualified undo record if valid
+ * pointer is passed.
+ *
+ * callback function decides whether particular undo record satisfies the
+ * condition of caller.
+ *
+ * Returns the required undo record if found, otherwise, return NULL which
+ * means either the record is already discarded or there is no such record
+ * in the undo chain.
+ */
+UnpackedUndoRecord *
+UndoFetchRecord(UndoRecPtr urp, BlockNumber blkno, OffsetNumber offset,
+				TransactionId xid, UndoRecPtr *urec_ptr_out,
+				SatisfyUndoRecordCallback callback)
+{
+	RelFileNode rnode,
+				prevrec_rnode = {0};
+	UnpackedUndoRecord *urec = NULL;
+	int			logno;
+
+	if (urec_ptr_out)
+		*urec_ptr_out = InvalidUndoRecPtr;
+
+	urec = palloc0(sizeof(UnpackedUndoRecord));
+	UndoRecPtrAssignRelFileNode(rnode, urp);
+
+	/* Find the undo record pointer we are interested in. */
+	while (true)
+	{
+		UndoLogControl *log;
+
+		logno = UndoRecPtrGetLogNo(urp);
+		log = UndoLogGet(logno, true);
+		if (log == NULL)
+		{
+			if (BufferIsValid(urec->uur_buffer))
+				ReleaseBuffer(urec->uur_buffer);
+			return NULL;
+		}
+
+		/*
+		 * Prevent UndoDiscardOneLog() from discarding data while we try to
+		 * read it.  Usually we would acquire log->mutex to read log->meta
+		 * members, but in this case we know that discard can't move without
+		 * also holding log->discard_lock.
+		 */
+		LWLockAcquire(&log->discard_lock, LW_SHARED);
+		if (!UndoRecordIsValid(log, urp))
+		{
+			if (BufferIsValid(urec->uur_buffer))
+				ReleaseBuffer(urec->uur_buffer);
+			return NULL;
+		}
+
+		/* Fetch the current undo record. */
+		urec = UndoGetOneRecord(urec, urp, rnode, log->meta.persistence);
+		LWLockRelease(&log->discard_lock);
+
+		if (blkno == InvalidBlockNumber)
+			break;
+
+		/* Check whether the undorecord satisfies conditions */
+		if (callback(urec, blkno, offset, xid))
+			break;
+
+		urp = urec->uur_blkprev;
+		prevrec_rnode = rnode;
+
+		/* Get rnode for the current undo record pointer. */
+		UndoRecPtrAssignRelFileNode(rnode, urp);
+
+		/* Reset the current undorecord before fetching the next. */
+		ResetUndoRecord(urec, urp, &rnode, &prevrec_rnode);
+	}
+
+	if (urec_ptr_out)
+		*urec_ptr_out = urp;
+	return urec;
+}
+
+/*
+ * Release the resources allocated by UndoFetchRecord.
+ */
+void
+UndoRecordRelease(UnpackedUndoRecord *urec)
+{
+	/*
+	 * If the undo record has a valid buffer then just release the buffer
+	 * otherwise free the tuple and payload data.
+	 */
+	if (BufferIsValid(urec->uur_buffer))
+	{
+		ReleaseBuffer(urec->uur_buffer);
+	}
+	else
+	{
+		if (urec->uur_payload.data)
+			pfree(urec->uur_payload.data);
+		if (urec->uur_tuple.data)
+			pfree(urec->uur_tuple.data);
+	}
+
+	pfree(urec);
+}
+
+/*
+ * RegisterUndoLogBuffers - Register the undo buffers.
+ */
+void
+RegisterUndoLogBuffers(uint8 first_block_id)
+{
+	int			idx;
+	int			flags;
+
+	for (idx = 0; idx < buffer_idx; idx++)
+	{
+		flags = undo_buffer[idx].zero
+			? REGBUF_KEEP_DATA_AFTER_CP | REGBUF_WILL_INIT
+			: REGBUF_KEEP_DATA_AFTER_CP;
+		XLogRegisterBuffer(first_block_id + idx, undo_buffer[idx].buf, flags);
+		UndoLogRegister(first_block_id + idx, undo_buffer[idx].logno);
+	}
+}
+
+/*
+ * UndoLogBuffersSetLSN - Set LSN on undo page.
+*/
+void
+UndoLogBuffersSetLSN(XLogRecPtr recptr)
+{
+	int			idx;
+
+	for (idx = 0; idx < buffer_idx; idx++)
+		PageSetLSN(BufferGetPage(undo_buffer[idx].buf), recptr);
+}
+
+/*
+ * Reset the global variables related to undo buffers. This is required at the
+ * transaction abort and while releasing the undo buffers.
+ */
+void
+ResetUndoBuffers(void)
+{
+	int			i;
+
+	for (i = 0; i < buffer_idx; i++)
+	{
+		undo_buffer[i].blk = InvalidBlockNumber;
+		undo_buffer[i].buf = InvalidBuffer;
+	}
+
+	for (i = 0; i < xact_urec_info_idx; i++)
+		xact_urec_info[i].urecptr = InvalidUndoRecPtr;
+
+	/* Reset the prepared index. */
+	prepare_idx = 0;
+	buffer_idx = 0;
+	xact_urec_info_idx = 0;
+	prepared_urec_ptr = InvalidUndoRecPtr;
+
+	/*
+	 * max_prepared_undo limit is changed so free the allocated memory and
+	 * reset all the variable back to their default value.
+	 */
+	if (max_prepared_undo > MAX_PREPARED_UNDO)
+	{
+		pfree(undo_buffer);
+		pfree(prepared_undo);
+		undo_buffer = def_buffers;
+		prepared_undo = def_prepared;
+		max_prepared_undo = MAX_PREPARED_UNDO;
+	}
+}
+
+/*
+ * Unlock and release the undo buffers.  This step must be performed after
+ * exiting any critical section where we have perfomed undo actions.
+ */
+void
+UnlockReleaseUndoBuffers(void)
+{
+	int			i;
+
+	for (i = 0; i < buffer_idx; i++)
+	{
+		if (BufferIsValid(undo_buffer[i].buf))
+			UnlockReleaseBuffer(undo_buffer[i].buf);
+	}
+	ResetUndoBuffers();
+}
+
+/*
+ * UndoGetPrevRecordLen - read length of the previous undo record.
+ *
+ * This function will take an undo record pointer as an input and read the
+ * length of the previous undo record which is stored at the end of the previous
+ * undo record.  If the previous undo record is split then this will add the
+ * undo block header size in the total length.
+ */
+static uint16
+UndoGetPrevRecordLen(UndoRecPtr urp, UndoPersistence upersistence)
+{
+	UndoLogOffset page_offset = UndoRecPtrGetPageOffset(urp);
+	BlockNumber	  cur_blk = UndoRecPtrGetBlockNum(urp);
+	Buffer	buffer;
+	char   *page;
+	char	prevlen[2];
+	RelFileNode rnode;
+	int		byte_to_read = sizeof(uint16);
+	char	persistence;
+	uint16	prev_rec_len = 0;
+
+	/* Get relfilenode. */
+	UndoRecPtrAssignRelFileNode(rnode, urp);
+	persistence = RelPersistenceForUndoPersistence(upersistence);
+
+	buffer = ReadBufferWithoutRelcache(SMGR_UNDO, rnode, UndoLogForkNum,
+									   cur_blk, RBM_NORMAL, NULL, persistence);
+
+	LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+	page = (char *)BufferGetPage(buffer);
+
+	/*
+	 * Length if the previous undo record is store at the end of that record
+	 * so just fetch last 2 bytes.
+	 */
+	while(byte_to_read > 0)
+	{
+		page_offset -= 1;
+
+		/*
+		 * Read first prevlen byte from current page if page_offset hasn't
+		 * reach to undo block header.  Otherwise move to the previous page.
+		 */
+		if (page_offset >= UndoLogBlockHeaderSize)
+		{
+			prevlen[byte_to_read - 1] = page[page_offset];
+			byte_to_read -= 1;
+		}
+		else
+		{
+			/* Release the previous buffer. */
+			UnlockReleaseBuffer(buffer);
+			cur_blk -= 1;
+			persistence = RelPersistenceForUndoPersistence(upersistence);
+			buffer = ReadBufferWithoutRelcache(SMGR_UNDO, rnode, UndoLogForkNum,
+											   cur_blk, RBM_NORMAL, NULL,
+											   persistence);
+			LockBuffer(buffer, BUFFER_LOCK_SHARE);
+			page_offset = BLCKSZ;
+			page = (char *)BufferGetPage(buffer);
+		}
+	}
+
+	prev_rec_len = *(uint16 *) (prevlen);
+
+	/*
+	 * If previous undo record is not completely stored in this page then add
+	 * UndoLogBlockHeaderSize in total length so that the call can use this
+	 * length to compute the undo record pointer of the previous undo record.
+	 */
+	if (UndoRecPtrGetPageOffset(urp) - UndoLogBlockHeaderSize < prev_rec_len)
+		prev_rec_len += UndoLogBlockHeaderSize;
+
+	/* Release the buffer if we have locally read it. */
+	UnlockReleaseBuffer(buffer);
+
+	return prev_rec_len;
+ }
diff --git a/src/backend/access/undo/undorecord.c b/src/backend/access/undo/undorecord.c
new file mode 100644
index 0000000..f11a20c
--- /dev/null
+++ b/src/backend/access/undo/undorecord.c
@@ -0,0 +1,494 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.c
+ *	  encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undorecord.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/undorecord.h"
+#include "catalog/pg_tablespace.h"
+#include "storage/block.h"
+
+/* Workspace for InsertUndoRecord and UnpackUndoRecord. */
+static UndoRecordHeader work_hdr;
+static UndoRecordRelationDetails work_rd;
+static UndoRecordBlock work_blk;
+static UndoRecordTransaction work_txn;
+static UndoRecordPayload work_payload;
+
+/* Prototypes for static functions. */
+static bool InsertUndoBytes(char *sourceptr, int sourcelen,
+				char **writeptr, char *endptr,
+				int *my_bytes_written, int *total_bytes_written);
+static bool ReadUndoBytes(char *destptr, int readlen,
+			  char **readptr, char *endptr,
+			  int *my_bytes_read, int *total_bytes_read, bool nocopy);
+
+/*
+ * Compute and return the expected size of an undo record.
+ */
+Size
+UndoRecordExpectedSize(UnpackedUndoRecord *uur)
+{
+	Size		size;
+
+	size = SizeOfUndoRecordHeader + sizeof(uint16);
+	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0)
+		size += SizeOfUndoRecordRelationDetails;
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+		size += SizeOfUndoRecordBlock;
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+		size += SizeOfUndoRecordTransaction;
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		size += SizeOfUndoRecordPayload;
+		size += uur->uur_payload.len;
+		size += uur->uur_tuple.len;
+	}
+
+	return size;
+}
+
+/*
+ * To insert an undo record, call InsertUndoRecord() repeatedly until it
+ * returns true.
+ *
+ * Insert as much of an undo record as will fit in the given page.
+ * starting_byte is the byte within the give page at which to begin writing,
+ * while *already_written is the number of bytes written to previous pages.
+ *
+ * Returns true if the remainder of the record was written and false if more
+ * bytes remain to be written; in either case, *already_written is set to the
+ * number of bytes written thus far.
+ *
+ * This function assumes that if *already_written is non-zero on entry, the
+ * same UnpackedUndoRecord is passed each time.  It also assumes that
+ * UnpackUndoRecord is not called between successive calls to InsertUndoRecord
+ * for the same UnpackedUndoRecord.
+ *
+ * If this function is called again to continue writing the record, the
+ * previous value for *already_written should be passed again, and
+ * starting_byte should be passed as sizeof(PageHeaderData) (since the record
+ * will continue immediately following the page header).
+ *
+ * remaining_bytes number of bytes to be written yet.  This value is only
+ * considered when page is NULL and that is required when caller just wanted
+ * the local work_hdr and the already_written variable to get updated but
+ * don't want to insert actual data in current block and work_hdr should be
+ * updated so that we can insert the remaining partial record in the next
+ * valid block.
+ */
+bool
+InsertUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_written, int remaining_bytes,
+				 uint16 undo_len, bool header_only)
+{
+	char	   *writeptr = (char *) page + starting_byte;
+	char	   *endptr = (char *) page + BLCKSZ;
+	int			my_bytes_written = *already_written;
+
+	/* The undo record must contain a valid information. */
+	Assert(uur->uur_info != 0);
+
+	/* If caller is not updating only header then must provide valid length. */
+	Assert(header_only || undo_len > 0);
+
+	/*
+	 * If this is the first call, copy the UnpackedUndoRecord into the
+	 * temporary variables of the types that will actually be stored in the
+	 * undo pages.  We just initialize everything here, on the assumption that
+	 * it's not worth adding branches to save a handful of assignments.
+	 */
+	if (*already_written == 0)
+	{
+		work_hdr.urec_rmid = uur->uur_rmid;
+		work_hdr.urec_type = uur->uur_type;
+		work_hdr.urec_info = uur->uur_info;
+		work_hdr.urec_reloid = uur->uur_reloid;
+		work_hdr.urec_prevxid = uur->uur_prevxid;
+		work_hdr.urec_xid = uur->uur_xid;
+		work_hdr.urec_cid = uur->uur_cid;
+		work_rd.urec_fork = uur->uur_fork;
+		work_blk.urec_blkprev = uur->uur_blkprev;
+		work_blk.urec_block = uur->uur_block;
+		work_blk.urec_offset = uur->uur_offset;
+		work_txn.urec_progress = uur->uur_progress;
+		work_txn.urec_xidepoch = uur->uur_xidepoch;
+		work_txn.urec_dbid = uur->uur_dbid;
+		work_txn.urec_prevurp = uur->uur_prevurp;
+		work_txn.urec_next = uur->uur_next;
+		work_payload.urec_payload_len = uur->uur_payload.len;
+		work_payload.urec_tuple_len = uur->uur_tuple.len;
+	}
+	else
+	{
+		/*
+		 * We should have been passed the same record descriptor as before, or
+		 * caller has messed up.
+		 */
+		Assert(work_hdr.urec_rmid == uur->uur_rmid);
+		Assert(work_hdr.urec_type == uur->uur_type);
+		Assert(work_hdr.urec_info == uur->uur_info);
+		Assert(work_hdr.urec_reloid == uur->uur_reloid);
+		Assert(work_hdr.urec_prevxid == uur->uur_prevxid);
+		Assert(work_hdr.urec_xid == uur->uur_xid);
+		Assert(work_hdr.urec_cid == uur->uur_cid);
+		Assert(work_rd.urec_fork == uur->uur_fork);
+		Assert(work_blk.urec_blkprev == uur->uur_blkprev);
+		Assert(work_blk.urec_block == uur->uur_block);
+		Assert(work_blk.urec_offset == uur->uur_offset);
+		Assert(work_txn.urec_progress == uur->uur_progress);
+		Assert(work_txn.urec_xidepoch == uur->uur_xidepoch);
+		Assert(work_txn.urec_dbid == uur->uur_dbid);
+		Assert(work_txn.urec_prevurp == uur->uur_prevurp);
+		Assert(work_txn.urec_next == uur->uur_next);
+		Assert(work_payload.urec_payload_len == uur->uur_payload.len);
+		Assert(work_payload.urec_tuple_len == uur->uur_tuple.len);
+	}
+
+	/*
+	 * Update already_written variable and return, see detailed comment in
+	 * function header.
+	 */
+	if (page == NULL)
+	{
+		*already_written += (BLCKSZ - starting_byte);
+		if (remaining_bytes <= (BLCKSZ - starting_byte))
+			return true;
+		else
+			return false;
+	}
+
+	/* Write header (if not already done). */
+	if (!InsertUndoBytes((char *) &work_hdr, SizeOfUndoRecordHeader,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Write relation details (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0 &&
+		!InsertUndoBytes((char *) &work_rd, SizeOfUndoRecordRelationDetails,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Write block information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0 &&
+		!InsertUndoBytes((char *) &work_blk, SizeOfUndoRecordBlock,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Write transaction information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0 &&
+		!InsertUndoBytes((char *) &work_txn, SizeOfUndoRecordTransaction,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	if (header_only)
+		return true;
+
+	/* Write payload information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		/* Payload header. */
+		if (!InsertUndoBytes((char *) &work_payload, SizeOfUndoRecordPayload,
+							 &writeptr, endptr,
+							 &my_bytes_written, already_written))
+			return false;
+
+		/* Payload bytes. */
+		if (uur->uur_payload.len > 0 &&
+			!InsertUndoBytes(uur->uur_payload.data, uur->uur_payload.len,
+							 &writeptr, endptr,
+							 &my_bytes_written, already_written))
+			return false;
+
+		/* Tuple bytes. */
+		if (uur->uur_tuple.len > 0 &&
+			!InsertUndoBytes(uur->uur_tuple.data, uur->uur_tuple.len,
+							 &writeptr, endptr,
+							 &my_bytes_written, already_written))
+			return false;
+	}
+
+	/* Insert undo record length at the end of the record. */
+	if (!InsertUndoBytes((char *) &undo_len, sizeof(uint16),
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Hooray! */
+	return true;
+}
+
+/*
+ * Write undo bytes from a particular source, but only to the extent that
+ * they weren't written previously and will fit.
+ *
+ * 'sourceptr' points to the source data, and 'sourcelen' is the length of
+ * that data in bytes.
+ *
+ * 'writeptr' points to the insertion point for these bytes, and is updated
+ * for whatever we write.  The insertion point must not pass 'endptr', which
+ * represents the end of the buffer into which we are writing.
+ *
+ * 'my_bytes_written' is a pointer to the count of previous-written bytes
+ * from this and following structures in this undo record; that is, any
+ * bytes that are part of previous structures in the record have already
+ * been subtracted out.
+ *
+ * 'total_bytes_written' points to the count of all previously-written bytes,
+ * and must it must be updated for the bytes we write.
+ *
+ * The return value is false if we ran out of space before writing all
+ * the bytes, and otherwise true.
+ */
+static bool
+InsertUndoBytes(char *sourceptr, int sourcelen,
+				char **writeptr, char *endptr,
+				int *my_bytes_written, int *total_bytes_written)
+{
+	int			can_write;
+	int			remaining;
+
+	/*
+	 * If we've previously written all of these bytes, there's nothing to do
+	 * except update *my_bytes_written, which we must do to ensure that the
+	 * next call to this function gets the right starting value.
+	 */
+	if (*my_bytes_written >= sourcelen)
+	{
+		*my_bytes_written -= sourcelen;
+		return true;
+	}
+
+	/* Compute number of bytes we can write. */
+	remaining = sourcelen - *my_bytes_written;
+	can_write = Min(remaining, endptr - *writeptr);
+
+	/* Bail out if no bytes can be written. */
+	if (can_write == 0)
+		return false;
+
+	/* Copy the bytes we can write. */
+	memcpy(*writeptr, sourceptr + *my_bytes_written, can_write);
+
+	/* Update bookkeeeping infrormation. */
+	*writeptr += can_write;
+	*total_bytes_written += can_write;
+	*my_bytes_written = 0;
+
+	/* Return true only if we wrote the whole thing. */
+	return (can_write == remaining);
+}
+
+/*
+ * Call UnpackUndoRecord() one or more times to unpack an undo record.  For
+ * the first call, starting_byte should be set to the beginning of the undo
+ * record within the specified page, and *already_decoded should be set to 0;
+ * the function will update it based on the number of bytes decoded.  The
+ * return value is true if the entire record was unpacked and false if the
+ * record continues on the next page.  In the latter case, the function
+ * should be called again with the next page, passing starting_byte as the
+ * sizeof(PageHeaderData).
+ */
+bool
+UnpackUndoRecord(UnpackedUndoRecord *uur, Page page, int starting_byte,
+				 int *already_decoded, bool header_only)
+{
+	char	   *readptr = (char *) page + starting_byte;
+	char	   *endptr = (char *) page + BLCKSZ;
+	int			my_bytes_decoded = *already_decoded;
+	bool		is_undo_splited = (my_bytes_decoded > 0) ? true : false;
+
+	/* Decode header (if not already done). */
+	if (!ReadUndoBytes((char *) &work_hdr, SizeOfUndoRecordHeader,
+					   &readptr, endptr,
+					   &my_bytes_decoded, already_decoded, false))
+		return false;
+
+	uur->uur_rmid = work_hdr.urec_rmid;
+	uur->uur_type = work_hdr.urec_type;
+	uur->uur_info = work_hdr.urec_info;
+	uur->uur_reloid = work_hdr.urec_reloid;
+	uur->uur_prevxid = work_hdr.urec_prevxid;
+	uur->uur_xid = work_hdr.urec_xid;
+	uur->uur_cid = work_hdr.urec_cid;
+
+	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0)
+	{
+		/* Decode header (if not already done). */
+		if (!ReadUndoBytes((char *) &work_rd, SizeOfUndoRecordRelationDetails,
+						   &readptr, endptr,
+						   &my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_fork = work_rd.urec_fork;
+	}
+
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+	{
+		if (!ReadUndoBytes((char *) &work_blk, SizeOfUndoRecordBlock,
+						   &readptr, endptr,
+						   &my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_blkprev = work_blk.urec_blkprev;
+		uur->uur_block = work_blk.urec_block;
+		uur->uur_offset = work_blk.urec_offset;
+	}
+
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+	{
+		if (!ReadUndoBytes((char *) &work_txn, SizeOfUndoRecordTransaction,
+						   &readptr, endptr,
+						   &my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_progress = work_txn.urec_progress;
+		uur->uur_xidepoch = work_txn.urec_xidepoch;
+		uur->uur_dbid = work_txn.urec_dbid;
+		uur->uur_prevurp = work_txn.urec_prevurp;
+		uur->uur_next = work_txn.urec_next;
+
+	}
+
+	if (header_only)
+		return true;
+
+	/* Read payload information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		if (!ReadUndoBytes((char *) &work_payload, SizeOfUndoRecordPayload,
+						   &readptr, endptr,
+						   &my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_payload.len = work_payload.urec_payload_len;
+		uur->uur_tuple.len = work_payload.urec_tuple_len;
+
+		/*
+		 * If we can read the complete record from a single page then just
+		 * point payload data and tuple data into the page otherwise allocate
+		 * the memory.
+		 *
+		 * XXX There is possibility of optimization that instead of always
+		 * allocating the memory whenever tuple is split we can check if any
+		 * of the payload or tuple data falling into the same page then don't
+		 * allocate the memory for that.
+		 */
+		if (!is_undo_splited &&
+			uur->uur_payload.len + uur->uur_tuple.len <= (endptr - readptr))
+		{
+			uur->uur_payload.data = readptr;
+			readptr += uur->uur_payload.len;
+
+			uur->uur_tuple.data = readptr;
+		}
+		else
+		{
+			if (uur->uur_payload.len > 0 && uur->uur_payload.data == NULL)
+				uur->uur_payload.data = (char *) palloc0(uur->uur_payload.len);
+
+			if (uur->uur_tuple.len > 0 && uur->uur_tuple.data == NULL)
+				uur->uur_tuple.data = (char *) palloc0(uur->uur_tuple.len);
+
+			if (!ReadUndoBytes((char *) uur->uur_payload.data,
+							   uur->uur_payload.len, &readptr, endptr,
+							   &my_bytes_decoded, already_decoded, false))
+				return false;
+
+			if (!ReadUndoBytes((char *) uur->uur_tuple.data,
+							   uur->uur_tuple.len, &readptr, endptr,
+							   &my_bytes_decoded, already_decoded, false))
+				return false;
+		}
+	}
+
+	return true;
+}
+
+/*
+ * Read undo bytes into a particular destination,
+ *
+ * 'destptr' points to the source data, and 'readlen' is the length of
+ * that data to be read in bytes.
+ *
+ * 'readptr' points to the read point for these bytes, and is updated
+ * for how much we read.  The read point must not pass 'endptr', which
+ * represents the end of the buffer from which we are reading.
+ *
+ * 'my_bytes_read' is a pointer to the count of previous-read bytes
+ * from this and following structures in this undo record; that is, any
+ * bytes that are part of previous structures in the record have already
+ * been subtracted out.
+ *
+ * 'total_bytes_read' points to the count of all previously-read bytes,
+ * and must likewise be updated for the bytes we read.
+ *
+ * nocopy if this flag is set true then it will just skip the readlen
+ * size in undo but it will not copy into the buffer.
+ *
+ * The return value is false if we ran out of space before read all
+ * the bytes, and otherwise true.
+ */
+static bool
+ReadUndoBytes(char *destptr, int readlen, char **readptr, char *endptr,
+			  int *my_bytes_read, int *total_bytes_read, bool nocopy)
+{
+	int			can_read;
+	int			remaining;
+
+	if (*my_bytes_read >= readlen)
+	{
+		*my_bytes_read -= readlen;
+		return true;
+	}
+
+	/* Compute number of bytes we can read. */
+	remaining = readlen - *my_bytes_read;
+	can_read = Min(remaining, endptr - *readptr);
+
+	/* Bail out if no bytes can be read. */
+	if (can_read == 0)
+		return false;
+
+	/* Copy the bytes we can read. */
+	if (!nocopy)
+		memcpy(destptr + *my_bytes_read, *readptr, can_read);
+
+	/* Update bookkeeping information. */
+	*readptr += can_read;
+	*total_bytes_read += can_read;
+	*my_bytes_read = 0;
+
+	/* Return true only if we wrote the whole thing. */
+	return (can_read == remaining);
+}
+
+/*
+ * Set uur_info for an UnpackedUndoRecord appropriately based on which
+ * other fields are set.
+ */
+void
+UndoRecordSetInfo(UnpackedUndoRecord *uur)
+{
+	if (uur->uur_fork != MAIN_FORKNUM)
+		uur->uur_info |= UREC_INFO_RELATION_DETAILS;
+	if (uur->uur_block != InvalidBlockNumber)
+		uur->uur_info |= UREC_INFO_BLOCK;
+	if (uur->uur_next != InvalidUndoRecPtr)
+		uur->uur_info |= UREC_INFO_TRANSACTION;
+	if (uur->uur_payload.len || uur->uur_tuple.len)
+		uur->uur_info |= UREC_INFO_PAYLOAD;
+}
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 7966a9e..592c338 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -47,6 +47,7 @@
 #define EpochFromFullTransactionId(x)	((uint32) ((x).value >> 32))
 #define XidFromFullTransactionId(x)		((uint32) (x).value)
 #define U64FromFullTransactionId(x)		((x).value)
+#define FullTransactionIdEquals(a, b)	((a).value == (b).value)
 #define FullTransactionIdPrecedes(a, b)	((a).value < (b).value)
 #define FullTransactionIdIsValid(x)		TransactionIdIsValid(XidFromFullTransactionId(x))
 #define InvalidFullTransactionId		FullTransactionIdFromEpochAndXid(0, InvalidTransactionId)
diff --git a/src/include/access/undoinsert.h b/src/include/access/undoinsert.h
new file mode 100644
index 0000000..5693827
--- /dev/null
+++ b/src/include/access/undoinsert.h
@@ -0,0 +1,54 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoinsert.h
+ *	  entry points for inserting undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undoinsert.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOINSERT_H
+#define UNDOINSERT_H
+
+#include "access/undolog.h"
+#include "access/undorecord.h"
+#include "access/xlogdefs.h"
+#include "catalog/pg_class.h"
+
+/*
+ * Typedef for callback function for UndoFetchRecord.
+ *
+ * This checks whether an undorecord satisfies the given conditions.
+ */
+typedef bool (*SatisfyUndoRecordCallback) (UnpackedUndoRecord *urec,
+										   BlockNumber blkno,
+										   OffsetNumber offset,
+										   TransactionId xid);
+
+extern UndoRecPtr PrepareUndoInsert(UnpackedUndoRecord *, FullTransactionId fxid,
+									UndoPersistence upersistence,
+									XLogReaderState *xlog_record);
+extern void InsertPreparedUndo(void);
+
+extern void RegisterUndoLogBuffers(uint8 first_block_id);
+extern void UndoLogBuffersSetLSN(XLogRecPtr recptr);
+extern void UnlockReleaseUndoBuffers(void);
+
+extern UnpackedUndoRecord *UndoFetchRecord(UndoRecPtr urp,
+				BlockNumber blkno, OffsetNumber offset,
+				TransactionId xid, UndoRecPtr *urec_ptr_out,
+				SatisfyUndoRecordCallback callback);
+extern void UndoRecordRelease(UnpackedUndoRecord *urec);
+extern void UndoRecordSetPrevUndoLen(uint16 len);
+extern void UndoSetPrepareSize(int nrecords);
+extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, uint16 prevlen, UndoRecPtr prevurp);
+extern void ResetUndoBuffers(void);
+
+extern UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec,
+											UndoRecPtr urp, RelFileNode rnode,
+											UndoPersistence upersistence);
+
+#endif							/* UNDOINSERT_H */
diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h
new file mode 100644
index 0000000..bbe7861
--- /dev/null
+++ b/src/include/access/undorecord.h
@@ -0,0 +1,201 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.h
+ *	  encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undorecord.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDORECORD_H
+#define UNDORECORD_H
+
+#include "access/undolog.h"
+#include "lib/stringinfo.h"
+#include "storage/block.h"
+#include "storage/bufpage.h"
+#include "storage/buf.h"
+#include "storage/off.h"
+
+
+/*
+ * Every undo record begins with an UndoRecordHeader structure, which is
+ * followed by the additional structures indicated by the contents of
+ * urec_info.  All structures are packed into the alignment without padding
+ * bytes, and the undo record itself need not be aligned either, so care
+ * must be taken when reading the header.
+ */
+typedef struct UndoRecordHeader
+{
+	RmgrId		urec_rmid;		/* RMGR [XXX:TODO: this creates an alignment
+								 * hole?] */
+	uint8		urec_type;		/* record type code */
+	uint8		urec_info;		/* flag bits */
+	uint16		urec_prevlen;	/* length of previous record in bytes */
+	Oid			urec_reloid;	/* relation OID */
+
+	/*
+	 * Transaction id that has modified the tuple present in this undo record.
+	 * If this is older than oldestXidWithEpochHavingUndo, then we can
+	 * consider the tuple in this undo record as visible.
+	 */
+	TransactionId urec_prevxid;
+
+	/*
+	 * Transaction id that has modified the tuple for which this undo record
+	 * is written.  We use this to skip the undo records.  See comments atop
+	 * function UndoFetchRecord.
+	 */
+	TransactionId urec_xid;		/* Transaction id */
+	CommandId	urec_cid;		/* command id */
+} UndoRecordHeader;
+
+#define SizeOfUndoRecordHeader	\
+	(offsetof(UndoRecordHeader, urec_cid) + sizeof(CommandId))
+
+/*
+ * If UREC_INFO_RELATION_DETAILS is set, an UndoRecordRelationDetails structure
+ * follows.
+ *
+ * If UREC_INFO_BLOCK is set, an UndoRecordBlock structure follows.
+ *
+ * If UREC_INFO_TRANSACTION is set, an UndoRecordTransaction structure
+ * follows.
+ *
+ * If UREC_INFO_PAYLOAD is set, an UndoRecordPayload structure follows.
+ *
+ * When (as will often be the case) multiple structures are present, they
+ * appear in the same order in which the constants are defined here.  That is,
+ * UndoRecordRelationDetails appears first.
+ */
+#define UREC_INFO_RELATION_DETAILS			0x01
+#define UREC_INFO_BLOCK						0x02
+#define UREC_INFO_PAYLOAD					0x04
+#define UREC_INFO_TRANSACTION				0x08
+
+/*
+ * Additional information about a relation to which this record pertains,
+ * namely the fork number.  If the fork number is MAIN_FORKNUM, this structure
+ * can (and should) be omitted.
+ */
+typedef struct UndoRecordRelationDetails
+{
+	ForkNumber	urec_fork;		/* fork number */
+} UndoRecordRelationDetails;
+
+#define SizeOfUndoRecordRelationDetails \
+	(offsetof(UndoRecordRelationDetails, urec_fork) + sizeof(uint8))
+
+/*
+ * Identifying information for a block to which this record pertains, and
+ * a pointer to the previous record for the same block.
+ */
+typedef struct UndoRecordBlock
+{
+	UndoRecPtr	urec_blkprev;	/* byte offset of previous undo for block */
+	BlockNumber urec_block;		/* block number */
+	OffsetNumber urec_offset;	/* offset number */
+} UndoRecordBlock;
+
+#define SizeOfUndoRecordBlock \
+	(offsetof(UndoRecordBlock, urec_offset) + sizeof(OffsetNumber))
+
+/*
+ * Identifying information for a transaction to which this undo belongs.  This
+ * also stores the dbid and the progress of the undo apply during rollback.
+ */
+typedef struct UndoRecordTransaction
+{
+	/*
+	 * This indicates undo action apply progress, 0 means not started, 1 means
+	 * completed.  In future, it can also be used to show the progress of how
+	 * much undo has been applied so far with some formula.
+	 */
+	uint32		urec_progress;
+	uint32		urec_xidepoch;	/* epoch of the current transaction */
+	Oid			urec_dbid;		/* database id */
+
+	/*
+	 * Transaction's previous undo record pointer when a transaction spans
+	 * across undo logs.  The first undo record in the new log stores the
+	 * previous undo record pointer in the previous log as we can't calculate
+	 * that directly using prevlen during rollback.
+	 */
+	UndoRecPtr	urec_prevurp;
+	UndoRecPtr	urec_next;		/* urec pointer of the next transaction */
+} UndoRecordTransaction;
+
+#define SizeOfUrecNext (sizeof(UndoRecPtr))
+#define SizeOfUndoRecordTransaction \
+	(offsetof(UndoRecordTransaction, urec_next) + SizeOfUrecNext)
+
+/*
+ * Information about the amount of payload data and tuple data present
+ * in this record.  The payload bytes immediately follow the structures
+ * specified by flag bits in urec_info, and the tuple bytes follow the
+ * payload bytes.
+ */
+typedef struct UndoRecordPayload
+{
+	uint16		urec_payload_len;	/* # of payload bytes */
+	uint16		urec_tuple_len; /* # of tuple bytes */
+} UndoRecordPayload;
+
+#define SizeOfUndoRecordPayload \
+	(offsetof(UndoRecordPayload, urec_tuple_len) + sizeof(uint16))
+
+/*
+ * Information that can be used to create an undo record or that can be
+ * extracted from one previously created.  The raw undo record format is
+ * difficult to manage, so this structure provides a convenient intermediate
+ * form that is easier for callers to manage.
+ *
+ * When creating an undo record from an UnpackedUndoRecord, caller should
+ * set uur_info to 0.  It will be initialized by the first call to
+ * UndoRecordSetInfo or InsertUndoRecord.  We do set it in
+ * UndoRecordAllocate for transaction specific header information.
+ *
+ * When an undo record is decoded into an UnpackedUndoRecord, all fields
+ * will be initialized, but those for which no information is available
+ * will be set to invalid or default values, as appropriate.
+ */
+typedef struct UnpackedUndoRecord
+{
+	RmgrId		uur_rmid;		/* rmgr ID */
+	uint8		uur_type;		/* record type code */
+	uint8		uur_info;		/* flag bits */
+	uint16		uur_prevlen;	/* length of previous record */
+	Oid			uur_reloid;		/* relation OID */
+	TransactionId uur_prevxid;	/* transaction id */
+	TransactionId uur_xid;		/* transaction id */
+	CommandId	uur_cid;		/* command id */
+	ForkNumber	uur_fork;		/* fork number */
+	UndoRecPtr	uur_blkprev;	/* byte offset of previous undo for block */
+	BlockNumber uur_block;		/* block number */
+	OffsetNumber uur_offset;	/* offset number */
+	Buffer		uur_buffer;		/* buffer in which undo record data points */
+	uint32		uur_xidepoch;	/* epoch of the inserting transaction. */
+	UndoRecPtr	uur_prevurp;	/* urec pointer to the previous record in the
+								 * different log */
+	UndoRecPtr	uur_next;		/* urec pointer of the next transaction */
+	Oid			uur_dbid;		/* database id */
+
+	/* undo applying progress, see detail comment in UndoRecordTransaction */
+	uint32		uur_progress;
+	StringInfoData uur_payload; /* payload bytes */
+	StringInfoData uur_tuple;	/* tuple bytes */
+} UnpackedUndoRecord;
+
+
+extern void UndoRecordSetInfo(UnpackedUndoRecord *uur);
+extern Size UndoRecordExpectedSize(UnpackedUndoRecord *uur);
+extern bool InsertUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_written,
+				 int remaining_bytes, uint16 undo_len, bool header_only);
+extern bool UnpackUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_decoded, bool header_only);
+
+#endif							/* UNDORECORD_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index d787f92..0d1b846 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -15,6 +15,7 @@
 #define XACT_H
 
 #include "access/transam.h"
+#include "access/undolog.h"
 #include "access/xlogreader.h"
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
@@ -439,5 +440,6 @@ extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_ab
 extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
+extern void SetCurrentUndoLocation(UndoRecPtr urec_ptr);
 
 #endif							/* XACT_H */
-- 
1.8.3.1

