From b7c43ff5467ff8f266c3be60c8cbf6bec849da75 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Mon, 3 Dec 2018 10:33:50 +0530
Subject: [PATCH] undo-interface-v11

Provide an interface for prepare, insert, or fetch the undo
records. This layer will use undo-log-storage to reserve the space for
the undo records and buffer management routine to write and read the
undo records.

Dilip Kumar with help from Rafia Sabih based on an early prototype
for forming undo record by Robert Haas and design inputs from Amit Kapila
Reviewed by Amit Kapila.
---
 src/backend/access/transam/xact.c    |   28 +
 src/backend/access/transam/xlog.c    |   30 +
 src/backend/access/undo/Makefile     |    2 +-
 src/backend/access/undo/undoinsert.c | 1192 ++++++++++++++++++++++++++++++++++
 src/backend/access/undo/undorecord.c |  461 +++++++++++++
 src/include/access/undoinsert.h      |   50 ++
 src/include/access/undorecord.h      |  187 ++++++
 src/include/access/xact.h            |    2 +
 src/include/access/xlog.h            |    1 +
 9 files changed, 1952 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/access/undo/undoinsert.c
 create mode 100644 src/backend/access/undo/undorecord.c
 create mode 100644 src/include/access/undoinsert.h
 create mode 100644 src/include/access/undorecord.h

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d967400..6060013 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -30,6 +30,7 @@
 #include "access/xlog.h"
 #include "access/xloginsert.h"
 #include "access/xlogutils.h"
+#include "access/undoinsert.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
@@ -66,6 +67,7 @@
 #include "utils/timestamp.h"
 #include "pg_trace.h"
 
+#define	AtAbort_ResetUndoBuffers() ResetUndoBuffers()
 
 /*
  *	User-tweakable parameters
@@ -189,6 +191,10 @@ typedef struct TransactionStateData
 	bool		startedInRecovery;	/* did we start in recovery? */
 	bool		didLogXid;		/* has xid been included in WAL record? */
 	int			parallelModeLevel;	/* Enter/ExitParallelMode counter */
+
+	 /* start and end undo record location for each persistence level */
+	UndoRecPtr	start_urec_ptr[UndoPersistenceLevels];
+	UndoRecPtr	latest_urec_ptr[UndoPersistenceLevels];
 	struct TransactionStateData *parent;	/* back link to parent */
 } TransactionStateData;
 
@@ -912,6 +918,26 @@ IsInParallelMode(void)
 }
 
 /*
+ * SetCurrentUndoLocation
+ */
+void
+SetCurrentUndoLocation(UndoRecPtr urec_ptr)
+{
+	UndoLogControl *log = UndoLogGet(UndoRecPtrGetLogNo(urec_ptr), false);
+	UndoPersistence upersistence = log->meta.persistence;
+
+	Assert(AmAttachedToUndoLog(log) || InRecovery);
+	/*
+	 * Set the start undo record pointer for first undo record in a
+	 * subtransaction.
+	 */
+	if (!UndoRecPtrIsValid(CurrentTransactionState->start_urec_ptr[upersistence]))
+		CurrentTransactionState->start_urec_ptr[upersistence] = urec_ptr;
+	CurrentTransactionState->latest_urec_ptr[upersistence] = urec_ptr;
+
+}
+
+/*
  *	CommandCounterIncrement
  */
 void
@@ -2631,6 +2657,7 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false);
 		AtEOXact_ApplyLauncher(false);
+		AtAbort_ResetUndoBuffers();
 		pgstat_report_xact_timestamp(0);
 	}
 
@@ -4815,6 +4842,7 @@ AbortSubTransaction(void)
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
 		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
+		AtAbort_ResetUndoBuffers();
 	}
 
 	/*
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 1064ee0..01815a6 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8323,6 +8323,36 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
 }
 
 /*
+ * GetEpochForXid - get the epoch associated with the xid
+ */
+uint32
+GetEpochForXid(TransactionId xid)
+{
+	uint32		ckptXidEpoch;
+	TransactionId ckptXid;
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	ckptXidEpoch = XLogCtl->ckptXidEpoch;
+	ckptXid = XLogCtl->ckptXid;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	/*
+	 * Xid can be on either side when near wrap-around.  Xid is certainly
+	 * logically later than ckptXid.  So if it's numerically less, it must
+	 * have wrapped into the next epoch.  OTOH, if it is numerically more,
+	 * but logically lesser, then it belongs to previous epoch.
+	 */
+	if (xid > ckptXid &&
+		TransactionIdPrecedes(xid, ckptXid))
+		ckptXidEpoch--;
+	else if (xid < ckptXid &&
+			 TransactionIdFollows(xid, ckptXid))
+		ckptXidEpoch++;
+
+	return ckptXidEpoch;
+}
+
+/*
  * This must be called ONCE during postmaster or standalone-backend shutdown
  */
 void
diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile
index 219c696..f41e8f7 100644
--- a/src/backend/access/undo/Makefile
+++ b/src/backend/access/undo/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undolog.o
+OBJS = undoinsert.o undolog.o undorecord.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/undo/undoinsert.c b/src/backend/access/undo/undoinsert.c
new file mode 100644
index 0000000..68ed822
--- /dev/null
+++ b/src/backend/access/undo/undoinsert.c
@@ -0,0 +1,1192 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoinsert.c
+ *	  entry points for inserting undo records
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undoinsert.c
+ *
+ * NOTES:
+ * Undo record layout:
+ *
+ * Undo records are stored in sequential order in the undo log.  Each undo
+ * record consists of a variable length header, tuple data, and payload
+ * information.  The first undo record of each transaction contains a
+ * transaction header that points to the next transaction's start header.
+ * This allows us to discard the entire transaction's log at one-shot rather
+ * than record-by-record.  The callers are not aware of transaction header,
+ * this is entirely maintained and used by undo record layer.   See
+ * undorecord.h for detailed information about undo record header.
+ *
+ * Handling multi log:
+ *
+ * It is possible that the undo record of a transaction can be spread across
+ * multiple undo log.  And, we need some special handling while inserting the
+ * undo for discard and rollback to work sanely.
+ *
+ * If the undorecord goes to next log then we insert a transaction header for
+ * the first record in the new log and update the transaction header with this
+ * new log's location. This will allow us to connect transactions across logs
+ * when the same transaction span across log (for this we keep track of the
+ * previous logno in undo log meta) which is required to find the latest undo
+ * record pointer of the aborted transaction for executing the undo actions
+ * before discard. If the next log get processed first in that case we
+ * don't need to trace back the actual start pointer of the transaction,
+ * in such case we can only execute the undo actions from the current log
+ * because the undo pointer in the slot will be rewound and that will be enough
+ * to avoid executing same actions.  However, there is possibility that after
+ * executing the undo actions the undo pointer got discarded, now in later
+ * stage while processing the previous log it might try to fetch the undo
+ * record in the discarded log while chasing the transaction header chain.
+ * To avoid this situation we first check if the next_urec of the transaction
+ * is already discarded then no need to access that and start executing from
+ * the last undo record in the current log.
+ *
+ * We only connect to next log if the same transaction spread to next log
+ * otherwise don't.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/undorecord.h"
+#include "access/undoinsert.h"
+#include "catalog/pg_tablespace.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/buf_internals.h"
+#include "storage/bufmgr.h"
+#include "miscadmin.h"
+#include "commands/tablecmds.h"
+
+/*
+ * XXX Do we want to support undo tuple size which is more than the BLCKSZ
+ * if not than undo record can spread across 2 buffers at the max.
+ */
+#define MAX_BUFFER_PER_UNDO    2
+
+/*
+ * This defines the number of undo records that can be prepared before
+ * calling insert by default.  If you need to prepare more than
+ * MAX_PREPARED_UNDO undo records, then you must call UndoSetPrepareSize
+ * first.
+ */
+#define MAX_PREPARED_UNDO 2
+
+/*
+ * Consider buffers needed for updating previous transaction's
+ * starting undo record. Hence increased by 1.
+ */
+#define MAX_UNDO_BUFFERS       (MAX_PREPARED_UNDO + 1) * MAX_BUFFER_PER_UNDO
+
+/*
+ * Previous top transaction id which inserted the undo.  Whenever a new main
+ * transaction try to prepare an undo record we will check if its txid not the
+ * same as prev_txid then we will insert the start undo record.
+ */
+static TransactionId prev_txid[UndoPersistenceLevels] = {0};
+
+/* Undo block number to buffer mapping. */
+typedef struct UndoBuffers
+{
+	UndoLogNumber logno;		/* Undo log number */
+	BlockNumber blk;			/* block number */
+	Buffer		buf;			/* buffer allocated for the block */
+	bool		zero;			/* new block full of zeroes */
+} UndoBuffers;
+
+static UndoBuffers def_buffers[MAX_UNDO_BUFFERS];
+static int	buffer_idx;
+
+/*
+ * Structure to hold the prepared undo information.
+ */
+typedef struct PreparedUndoSpace
+{
+	UndoRecPtr	urp;			/* undo record pointer */
+	UnpackedUndoRecord *urec;	/* undo record */
+	int			undo_buffer_idx[MAX_BUFFER_PER_UNDO];	/* undo_buffer array
+														 * index */
+} PreparedUndoSpace;
+
+static PreparedUndoSpace def_prepared[MAX_PREPARED_UNDO];
+static int	prepare_idx;
+static int	max_prepared_undo = MAX_PREPARED_UNDO;
+static UndoRecPtr prepared_urec_ptr = InvalidUndoRecPtr;
+static bool update_prev_header = false;
+
+/*
+ * By default prepared_undo and undo_buffer points to the static memory.
+ * In case caller wants to support more than default max_prepared undo records
+ * then the limit can be increased by calling UndoSetPrepareSize function.
+ * Therein, dynamic memory will be allocated and prepared_undo and undo_buffer
+ * will start pointing to newly allocated memory, which will be released by
+ * UnlockReleaseUndoBuffers and these variables will again set back to their
+ * default values.
+ */
+static PreparedUndoSpace *prepared_undo = def_prepared;
+static UndoBuffers *undo_buffer = def_buffers;
+
+/*
+ * Structure to hold the previous transaction's undo update information.  This
+ * is populated while current transaction is updating its undo record pointer
+ * in previous transactions first undo record.
+ */
+typedef struct XactUndoRecordInfo
+{
+	UndoRecPtr	urecptr;		/* txn's start urecptr */
+	int			idx_undo_buffers[MAX_BUFFER_PER_UNDO];
+	UnpackedUndoRecord uur;		/* undo record header */
+} XactUndoRecordInfo;
+
+static XactUndoRecordInfo xact_urec_info;
+
+/* Prototypes for static functions. */
+static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec,
+				 UndoRecPtr urp, RelFileNode rnode,
+				 UndoPersistence persistence);
+static void UndoRecordPrepareTransInfo(UndoRecPtr urecptr,
+						   bool log_switched);
+static void UndoRecordUpdateTransInfo(void);
+static int UndoGetBufferSlot(RelFileNode rnode, BlockNumber blk,
+				  ReadBufferMode rbm,
+				  UndoPersistence persistence);
+static bool UndoRecordIsValid(UndoLogControl * log,
+				  UndoRecPtr urp);
+
+/*
+ * Check whether the undo record is discarded or not.  If it's already discarded
+ * return false otherwise return true.
+ *
+ * Caller must hold lock on log->discard_lock.  This function will release the
+ * lock if return false otherwise lock will be held on return and the caller
+ * need to release it.
+ */
+static bool
+UndoRecordIsValid(UndoLogControl * log, UndoRecPtr urp)
+{
+	Assert(LWLockHeldByMeInMode(&log->discard_lock, LW_SHARED));
+
+	if (log->oldest_data == InvalidUndoRecPtr)
+	{
+		/*
+		 * oldest_data is only initialized when the DiscardWorker first time
+		 * attempts to discard undo logs so we can not rely on this value to
+		 * identify whether the undo record pointer is already discarded or
+		 * not so we can check it by calling undo log routine.  If its not yet
+		 * discarded then we have to reacquire the log->discard_lock so that
+		 * the doesn't get discarded concurrently.
+		 */
+		LWLockRelease(&log->discard_lock);
+		if (UndoLogIsDiscarded(urp))
+			return false;
+		LWLockAcquire(&log->discard_lock, LW_SHARED);
+	}
+
+	/* Check again if it's already discarded. */
+	if (urp < log->oldest_data)
+	{
+		LWLockRelease(&log->discard_lock);
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Prepare to update the previous transaction's next undo pointer to maintain
+ * the transaction chain in the undo.  This will read the header of the first
+ * undo record of the previous transaction and lock the necessary buffers.
+ * The actual update will be done by UndoRecordUpdateTransInfo under the
+ * critical section.
+ */
+static void
+UndoRecordPrepareTransInfo(UndoRecPtr urecptr, bool log_switched)
+{
+	UndoRecPtr	xact_urp;
+	Buffer		buffer = InvalidBuffer;
+	BlockNumber cur_blk;
+	RelFileNode rnode;
+	UndoLogNumber logno = UndoRecPtrGetLogNo(urecptr);
+	UndoLogControl *log;
+	Page		page;
+	int			already_decoded = 0;
+	int			starting_byte;
+	int			bufidx;
+	int			index = 0;
+
+	log = UndoLogGet(logno, false);
+
+	if (log_switched)
+	{
+		Assert(log->meta.prevlogno != InvalidUndoLogNumber);
+		log = UndoLogGet(log->meta.prevlogno, false);
+	}
+
+	/*
+	 * Temporary undo logs are discarded on transaction commit so we don't
+	 * need to do anything.
+	 */
+	if (log->meta.persistence == UNDO_TEMP)
+		return;
+
+	/*
+	 * We can read the previous transaction's location without locking,
+	 * because only the backend attached to the log can write to it (or we're
+	 * in recovery).
+	 */
+	Assert(AmAttachedToUndoLog(log) || InRecovery || log_switched);
+
+	if (log->meta.last_xact_start == 0)
+		xact_urp = InvalidUndoRecPtr;
+	else
+		xact_urp = MakeUndoRecPtr(log->logno, log->meta.last_xact_start);
+
+	/*
+	 * Acquire the discard lock before accessing the undo record so that
+	 * discard worker doesn't remove the record while we are in process of
+	 * reading it.
+	 */
+	LWLockAcquire(&log->discard_lock, LW_SHARED);
+
+	/*
+	 * The absence of previous transaction's undo indicate that this backend
+	 * is preparing its first undo in which case we have nothing to update.
+	 * UndoRecordIsValid will release the lock if it returns false.
+	 */
+	if (!UndoRecordIsValid(log, xact_urp))
+		return;
+
+	UndoRecPtrAssignRelFileNode(rnode, xact_urp);
+	cur_blk = UndoRecPtrGetBlockNum(xact_urp);
+	starting_byte = UndoRecPtrGetPageOffset(xact_urp);
+
+	/*
+	 * Read undo record header in by calling UnpackUndoRecord, if the undo
+	 * record header is split across buffers then we need to read the complete
+	 * header by invoking UnpackUndoRecord multiple times.
+	 */
+	while (true)
+	{
+		bufidx = UndoGetBufferSlot(rnode, cur_blk,
+								   RBM_NORMAL,
+								   log->meta.persistence);
+		xact_urec_info.idx_undo_buffers[index++] = bufidx;
+		buffer = undo_buffer[bufidx].buf;
+		page = BufferGetPage(buffer);
+
+		if (UnpackUndoRecord(&xact_urec_info.uur, page, starting_byte,
+							 &already_decoded, true))
+			break;
+
+		/* Could not fetch the complete header so go to the next block. */
+		starting_byte = UndoLogBlockHeaderSize;
+		cur_blk++;
+	}
+
+	xact_urec_info.uur.uur_next = urecptr;
+	xact_urec_info.urecptr = xact_urp;
+	LWLockRelease(&log->discard_lock);
+}
+
+/*
+ * Overwrite the first undo record of the previous transaction to update its
+ * next pointer.  This will just insert the already prepared record by
+ * UndoRecordPrepareTransInfo.  This must be called under the critical section.
+ * This will just overwrite the undo header not the data.
+ */
+static void
+UndoRecordUpdateTransInfo(void)
+{
+	UndoLogNumber logno = UndoRecPtrGetLogNo(xact_urec_info.urecptr);
+	Page		page;
+	int			starting_byte;
+	int			already_written = 0;
+	int			idx = 0;
+	UndoRecPtr	urec_ptr = InvalidUndoRecPtr;
+	UndoLogControl *log;
+
+	log = UndoLogGet(logno, false);
+	urec_ptr = xact_urec_info.urecptr;
+
+	/*
+	 * Acquire the discard lock before accessing the undo record so that
+	 * discard worker can't remove the record while we are in process of
+	 * reading it.
+	 */
+	LWLockAcquire(&log->discard_lock, LW_SHARED);
+
+	if (!UndoRecordIsValid(log, urec_ptr))
+		return;
+
+	/*
+	 * Update the next transactions start urecptr in the transaction header.
+	 */
+	starting_byte = UndoRecPtrGetPageOffset(urec_ptr);
+
+	do
+	{
+		Buffer		buffer;
+		int			buf_idx;
+
+		buf_idx = xact_urec_info.idx_undo_buffers[idx];
+		buffer = undo_buffer[buf_idx].buf;
+		page = BufferGetPage(buffer);
+
+		/* Overwrite the previously written undo. */
+		if (InsertUndoRecord(&xact_urec_info.uur, page, starting_byte, &already_written, true))
+		{
+			MarkBufferDirty(buffer);
+			break;
+		}
+
+		MarkBufferDirty(buffer);
+		starting_byte = UndoLogBlockHeaderSize;
+		idx++;
+
+		Assert(idx < MAX_BUFFER_PER_UNDO);
+	} while (true);
+
+	LWLockRelease(&log->discard_lock);
+}
+
+/*
+ * Find the block number in undo buffer array, if it's present then just return
+ * its index otherwise search the buffer and insert an entry and lock the buffer
+ * in exclusive mode.
+ *
+ * Undo log insertions are append-only.  If the caller is writing new data
+ * that begins exactly at the beginning of a page, then there cannot be any
+ * useful data after that point.  In that case RBM_ZERO can be passed in as
+ * rbm so that we can skip a useless read of a disk block.  In all other
+ * cases, RBM_NORMAL should be passed in, to read the page in if it doesn't
+ * happen to be already in the buffer pool.
+ */
+static int
+UndoGetBufferSlot(RelFileNode rnode,
+				  BlockNumber blk,
+				  ReadBufferMode rbm,
+				  UndoPersistence persistence)
+{
+	int			i;
+	Buffer		buffer;
+
+	/* Don't do anything, if we already have a buffer pinned for the block. */
+	for (i = 0; i < buffer_idx; i++)
+	{
+		/*
+		 * It's not enough to just compare the block number because the
+		 * undo_buffer might holds the undo from different undo logs (e.g when
+		 * previous transaction start header is in previous undo log) so
+		 * compare (logno + blkno).
+		 */
+		if ((blk == undo_buffer[i].blk) &&
+			(undo_buffer[i].logno == rnode.relNode))
+		{
+			/* caller must hold exclusive lock on buffer */
+			Assert(BufferIsLocal(undo_buffer[i].buf) ||
+				   LWLockHeldByMeInMode(BufferDescriptorGetContentLock(
+																	   GetBufferDescriptor(undo_buffer[i].buf - 1)),
+										LW_EXCLUSIVE));
+			break;
+		}
+	}
+
+	/*
+	 * We did not find the block so allocate the buffer and insert into the
+	 * undo buffer array
+	 */
+	if (i == buffer_idx)
+	{
+		/*
+		 * Fetch the buffer in which we want to insert the undo record.
+		 */
+		buffer = ReadBufferWithoutRelcache(rnode,
+										   UndoLogForkNum,
+										   blk,
+										   rbm,
+										   NULL,
+										   RelPersistenceForUndoPersistence(persistence));
+
+		/* Lock the buffer */
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+		undo_buffer[buffer_idx].buf = buffer;
+		undo_buffer[buffer_idx].blk = blk;
+		undo_buffer[buffer_idx].logno = rnode.relNode;
+		undo_buffer[buffer_idx].zero = rbm == RBM_ZERO;
+		buffer_idx++;
+	}
+
+	return i;
+}
+
+/*
+ * Calculate total size required by nrecords and allocate them in bulk. This is
+ * required for some operation which can allocate multiple undo record in one
+ * WAL operation e.g multi-insert.  If we don't allocate undo space for all the
+ * record (which are inserted under one WAL) together than there is possibility
+ * that both of them go under different undo log.  And, currently during
+ * recovery we don't have mechanism to map xid to multiple log number during one
+ * WAL operation.  So in short all the operation under one WAL must allocate
+ * their undo from the same undo log.
+ */
+static UndoRecPtr
+UndoRecordAllocate(UnpackedUndoRecord *undorecords, int nrecords,
+				   TransactionId txid, UndoPersistence upersistence)
+{
+	UnpackedUndoRecord *urec = NULL;
+	UndoLogControl *log;
+	UndoRecordSize size;
+	UndoRecPtr	urecptr;
+	bool		need_xact_hdr = false;
+	bool		log_switched = false;
+	int			i;
+
+	/* There must be at least one undo record. */
+	if (nrecords <= 0)
+		elog(ERROR, "cannot allocate space for zero undo records");
+
+	/* Is this the first undo record of the transaction? */
+	if ((InRecovery && IsTransactionFirstRec(txid)) ||
+		(!InRecovery && prev_txid[upersistence] != txid))
+		need_xact_hdr = true;
+
+resize:
+	size = 0;
+
+	for (i = 0; i < nrecords; i++)
+	{
+		urec = undorecords + i;
+
+		/*
+		 * Prepare the transacion header for the first undo record of
+		 * transaction.
+		 *
+		 * XXX There is also an option that instead of adding the
+		 * information to this record we can prepare a new record which only
+		 * contain transaction informations, but we can't see any clear
+		 * advantage of the same.
+		 */
+		if (need_xact_hdr && i == 0)
+		{
+			urec->uur_next = InvalidUndoRecPtr;
+			urec->uur_xidepoch = GetEpochForXid(txid);
+			urec->uur_progress = 0;
+
+			/* During recovery, get the database id from the undo log state. */
+			if (InRecovery)
+				urec->uur_dbid = UndoLogStateGetDatabaseId();
+			else
+				urec->uur_dbid = MyDatabaseId;
+
+			/* Set uur_info to include the transaction header. */
+			urec->uur_info |= UREC_INFO_TRANSACTION;
+		}
+		else
+		{
+			/*
+			 * It is okay to initialize these variables with invalid values as
+			 * these are used only with the first record of transaction.
+			 */
+			urec->uur_next = InvalidUndoRecPtr;
+			urec->uur_xidepoch = 0;
+			urec->uur_dbid = 0;
+			urec->uur_progress = 0;
+		}
+
+		/* Calculate the size of the undo record based on the info required. */
+		UndoRecordSetInfo(urec);
+		size += UndoRecordExpectedSize(urec);
+	}
+
+	if (InRecovery)
+		urecptr = UndoLogAllocateInRecovery(txid, size, upersistence);
+	else
+		urecptr = UndoLogAllocate(size, upersistence);
+
+	log = UndoLogGet(UndoRecPtrGetLogNo(urecptr), false);
+
+	/*
+	 * By now, we must be attached to some undo log unless we are in recovery.
+	 */
+	Assert(AmAttachedToUndoLog(log) || InRecovery);
+
+	/*
+	 * We can consider the log as switched if this is the first record of the
+	 * log and not the first record of the transaction i.e. same transaction
+	 * continued from the previous log.
+	 */
+	if ((UndoRecPtrGetOffset(urecptr) == UndoLogBlockHeaderSize) &&
+		log->meta.prevlogno != InvalidUndoLogNumber)
+		log_switched = true;
+
+	/*
+	 * If we've rewound all the way back to the start of the transaction by
+	 * rolling back the first subtransaction (which we can't detect until
+	 * after we've allocated some space), we'll need a new transaction header.
+	 * If we weren't already generating one, then do it now.
+	 */
+	if (!need_xact_hdr &&
+		(log->meta.insert == log->meta.last_xact_start ||
+		 UndoRecPtrGetOffset(urecptr) == UndoLogBlockHeaderSize))
+	{
+		need_xact_hdr = true;
+		urec->uur_info = 0;		/* force recomputation of info bits */
+		goto resize;
+	}
+
+	/* Update the previous transaction's start undo record, if required. */
+	if (need_xact_hdr || log_switched)
+	{
+		/* Don't update our own start header. */
+		if (log->meta.last_xact_start != log->meta.insert)
+			UndoRecordPrepareTransInfo(urecptr, log_switched);
+
+		/* Remember the current transaction's xid. */
+		prev_txid[upersistence] = txid;
+
+		/* Store the current transaction's start undorecptr in the undo log. */
+		UndoLogSetLastXactStartPoint(urecptr);
+		update_prev_header = false;
+	}
+
+	UndoLogAdvance(urecptr, size, upersistence);
+
+	return urecptr;
+}
+
+/*
+ * Call UndoSetPrepareSize to set the value of how many undo records can be
+ * prepared before we can insert them.  If the size is greater than
+ * MAX_PREPARED_UNDO then it will allocate extra memory to hold the extra
+ * prepared undo.
+ *
+ * This is normally used when more than one undo record needs to be prepared.
+ */
+void
+UndoSetPrepareSize(UnpackedUndoRecord *undorecords, int nrecords,
+				   TransactionId xid, UndoPersistence upersistence)
+{
+	TransactionId txid;
+
+	/* Get the top transaction id. */
+	if (xid == InvalidTransactionId)
+	{
+		Assert(!InRecovery);
+		txid = GetTopTransactionId();
+	}
+	else
+	{
+		Assert(InRecovery || (xid == GetTopTransactionId()));
+		txid = xid;
+	}
+
+	prepared_urec_ptr = UndoRecordAllocate(undorecords, nrecords, txid,
+										   upersistence);
+	if (nrecords <= MAX_PREPARED_UNDO)
+		return;
+
+	prepared_undo = palloc0(nrecords * sizeof(PreparedUndoSpace));
+
+	/*
+	 * Consider buffers needed for updating previous transaction's starting
+	 * undo record. Hence increased by 1.
+	 */
+	undo_buffer = palloc0((nrecords + 1) * MAX_BUFFER_PER_UNDO *
+						  sizeof(UndoBuffers));
+	max_prepared_undo = nrecords;
+}
+
+/*
+ * Call PrepareUndoInsert to tell the undo subsystem about the undo record you
+ * intended to insert.  Upon return, the necessary undo buffers are pinned and
+ * locked.
+ *
+ * This should be done before any critical section is established, since it
+ * can fail.
+ *
+ * In recovery, 'xid' refers to the transaction id stored in WAL, otherwise,
+ * it refers to the top transaction id because undo log only stores mapping
+ * for the top most transactions.
+ */
+UndoRecPtr
+PrepareUndoInsert(UnpackedUndoRecord *urec, TransactionId xid,
+				  UndoPersistence upersistence)
+{
+	UndoRecordSize size;
+	UndoRecPtr	urecptr;
+	RelFileNode rnode;
+	UndoRecordSize cur_size = 0;
+	BlockNumber cur_blk;
+	TransactionId txid;
+	int			starting_byte;
+	int			index = 0;
+	int			bufidx;
+	ReadBufferMode rbm;
+
+	/* Already reached maximum prepared limit. */
+	if (prepare_idx == max_prepared_undo)
+		elog(ERROR, "already reached the maximum prepared limit");
+
+
+	if (xid == InvalidTransactionId)
+	{
+		/* During recovery, we must have a valid transaction id. */
+		Assert(!InRecovery);
+		txid = GetTopTransactionId();
+	}
+	else
+	{
+		/*
+		 * Assign the top transaction id because undo log only stores mapping
+		 * for the top most transactions.
+		 */
+		Assert(InRecovery || (xid == GetTopTransactionId()));
+		txid = xid;
+	}
+
+	if (!UndoRecPtrIsValid(prepared_urec_ptr))
+		urecptr = UndoRecordAllocate(urec, 1, txid, upersistence);
+	else
+		urecptr = prepared_urec_ptr;
+
+	/* advance the prepared ptr location for next record. */
+	size = UndoRecordExpectedSize(urec);
+	if (UndoRecPtrIsValid(prepared_urec_ptr))
+	{
+		UndoLogOffset insert = UndoRecPtrGetOffset(prepared_urec_ptr);
+
+		insert = UndoLogOffsetPlusUsableBytes(insert, size);
+		prepared_urec_ptr = MakeUndoRecPtr(UndoRecPtrGetLogNo(urecptr), insert);
+	}
+
+	cur_blk = UndoRecPtrGetBlockNum(urecptr);
+	UndoRecPtrAssignRelFileNode(rnode, urecptr);
+	starting_byte = UndoRecPtrGetPageOffset(urecptr);
+
+	/*
+	 * If we happen to be writing the very first byte into this page, then
+	 * there is no need to read from disk.
+	 */
+	if (starting_byte == UndoLogBlockHeaderSize)
+		rbm = RBM_ZERO;
+	else
+		rbm = RBM_NORMAL;
+
+	do
+	{
+		bufidx = UndoGetBufferSlot(rnode, cur_blk, rbm, upersistence);
+		if (cur_size == 0)
+			cur_size = BLCKSZ - starting_byte;
+		else
+			cur_size += BLCKSZ - UndoLogBlockHeaderSize;
+
+		/* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+		Assert(index < MAX_BUFFER_PER_UNDO);
+
+		/* Keep the track of the buffers we have pinned and locked. */
+		prepared_undo[prepare_idx].undo_buffer_idx[index++] = bufidx;
+
+		/*
+		 * If we need more pages they'll be all new so we can definitely skip
+		 * reading from disk.
+		 */
+		rbm = RBM_ZERO;
+		cur_blk++;
+	} while (cur_size < size);
+
+	/*
+	 * Save the undo record information to be later used by InsertPreparedUndo
+	 * to insert the prepared record.
+	 */
+	prepared_undo[prepare_idx].urec = urec;
+	prepared_undo[prepare_idx].urp = urecptr;
+	prepare_idx++;
+
+	return urecptr;
+}
+
+/*
+ * Insert a previously-prepared undo record.  This will write the actual undo
+ * record into the buffers already pinned and locked in PreparedUndoInsert,
+ * and mark them dirty.  This step should be performed after entering a
+ * criticalsection; it should never fail.
+ */
+void
+InsertPreparedUndo(void)
+{
+	Page		page;
+	int			starting_byte;
+	int			already_written;
+	int			bufidx = 0;
+	int			idx;
+	uint16		undo_len = 0;
+	UndoRecPtr	urp;
+	UnpackedUndoRecord *uur;
+	UndoLogOffset offset;
+	UndoLogControl *log;
+
+	/* There must be atleast one prepared undo record. */
+	Assert(prepare_idx > 0);
+
+	/*
+	 * This must be called under a critical section or we must be in recovery.
+	 */
+	Assert(InRecovery || CritSectionCount > 0);
+
+	for (idx = 0; idx < prepare_idx; idx++)
+	{
+		uur = prepared_undo[idx].urec;
+		urp = prepared_undo[idx].urp;
+
+		already_written = 0;
+		bufidx = 0;
+		starting_byte = UndoRecPtrGetPageOffset(urp);
+		offset = UndoRecPtrGetOffset(urp);
+
+		log = UndoLogGet(UndoRecPtrGetLogNo(urp), false);
+		Assert(AmAttachedToUndoLog(log) || InRecovery);
+
+		/*
+		 * Store the previous undo record length in the header.  We can read
+		 * meta.prevlen without locking, because only we can write to it.
+		 */
+		uur->uur_prevlen = log->meta.prevlen;
+
+		/*
+		 * If starting a new log then there is no prevlen to store except when
+		 * the same transaction is continuing from the previous undo log read
+		 * detailed comment atop this file.
+		 */
+		if (offset == UndoLogBlockHeaderSize)
+		{
+			if (log->meta.prevlogno != InvalidUndoLogNumber)
+			{
+				UndoLogControl *prevlog = UndoLogGet(log->meta.prevlogno, false);
+
+				uur->uur_prevlen = prevlog->meta.prevlen;
+			}
+			else
+				uur->uur_prevlen = 0;
+		}
+
+		/*
+		 * if starting from a new page then consider block header size in
+		 * prevlen calculation.
+		 */
+		else if (starting_byte == UndoLogBlockHeaderSize)
+			uur->uur_prevlen += UndoLogBlockHeaderSize;
+
+		undo_len = 0;
+
+		do
+		{
+			PreparedUndoSpace undospace = prepared_undo[idx];
+			Buffer		buffer;
+
+			buffer = undo_buffer[undospace.undo_buffer_idx[bufidx]].buf;
+			page = BufferGetPage(buffer);
+
+			/*
+			 * Initialize the page whenever we try to write the first record
+			 * in page.  We start writting immediately after the block header.
+			 */
+			if (starting_byte == UndoLogBlockHeaderSize)
+				PageInit(page, BLCKSZ, 0);
+
+			/*
+			 * Try to insert the record into the current page. If it doesn't
+			 * succeed then recall the routine with the next page.
+			 */
+			if (InsertUndoRecord(uur, page, starting_byte, &already_written, false))
+			{
+				undo_len += already_written;
+				MarkBufferDirty(buffer);
+				break;
+			}
+
+			MarkBufferDirty(buffer);
+
+			/*
+			 * If we are swithing to the next block then consider the header
+			 * in total undo length.
+			 */
+			starting_byte = UndoLogBlockHeaderSize;
+			undo_len += UndoLogBlockHeaderSize;
+			bufidx++;
+
+			/* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+			Assert(bufidx < MAX_BUFFER_PER_UNDO);
+		} while (true);
+
+		UndoLogSetPrevLen(UndoRecPtrGetLogNo(urp), undo_len);
+
+		/*
+		 * Link the transactions in the same log so that we can discard all
+		 * the transaction's undo log in one-shot.
+		 */
+		if (UndoRecPtrIsValid(xact_urec_info.urecptr))
+			UndoRecordUpdateTransInfo();
+
+		/*
+		 * Set the current undo location for a transaction.  This is required
+		 * to perform rollback during abort of transaction.
+		 */
+		SetCurrentUndoLocation(urp);
+	}
+}
+
+/*
+ * Helper function for UndoFetchRecord.  It will fetch the undo record pointed
+ * by urp and unpack the record into urec.  This function will not release the
+ * pin on the buffer if complete record is fetched from one buffer, so caller
+ * can reuse the same urec to fetch the another undo record which is on the
+ * same block.  Caller will be responsible to release the buffer inside urec
+ * and set it to invalid if it wishes to fetch the record from another block.
+ */
+static UnpackedUndoRecord *
+UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode,
+				 UndoPersistence persistence)
+{
+	Buffer		buffer = urec->uur_buffer;
+	Page		page;
+	int			starting_byte = UndoRecPtrGetPageOffset(urp);
+	int			already_decoded = 0;
+	BlockNumber cur_blk;
+	bool		is_undo_rec_split = false;
+
+	cur_blk = UndoRecPtrGetBlockNum(urp);
+
+	/* If we already have a buffer pin then no need to allocate a new one. */
+	if (!BufferIsValid(buffer))
+	{
+		buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum, cur_blk,
+										   RBM_NORMAL, NULL,
+										   RelPersistenceForUndoPersistence(persistence));
+
+		urec->uur_buffer = buffer;
+	}
+
+	while (true)
+	{
+		LockBuffer(buffer, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buffer);
+
+		/*
+		 * XXX This can be optimized to just fetch header first and only if
+		 * matches with block number and offset then fetch the complete
+		 * record.
+		 */
+		if (UnpackUndoRecord(urec, page, starting_byte, &already_decoded, false))
+			break;
+
+		starting_byte = UndoLogBlockHeaderSize;
+		is_undo_rec_split = true;
+
+		/*
+		 * The record spans more than a page so we would have copied it (see
+		 * UnpackUndoRecord).  In such cases, we can release the buffer.
+		 */
+		urec->uur_buffer = InvalidBuffer;
+		UnlockReleaseBuffer(buffer);
+
+		/* Go to next block. */
+		cur_blk++;
+		buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum, cur_blk,
+										   RBM_NORMAL, NULL,
+										   RelPersistenceForUndoPersistence(persistence));
+	}
+
+	/*
+	 * If we have copied the data then release the buffer, otherwise, just
+	 * unlock it.
+	 */
+	if (is_undo_rec_split)
+		UnlockReleaseBuffer(buffer);
+	else
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+	return urec;
+}
+
+/*
+ * ResetUndoRecord - Helper function for UndoFetchRecord to reset the current
+ * record.
+ */
+static void
+ResetUndoRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode *rnode,
+				RelFileNode *prevrec_rnode)
+{
+	/*
+	 * If we have a valid buffer pinned then just ensure that we want to find
+	 * the next tuple from the same block.  Otherwise release the buffer and
+	 * set it invalid
+	 */
+	if (BufferIsValid(urec->uur_buffer))
+	{
+		/*
+		 * Undo buffer will be changed if the next undo record belongs to a
+		 * different block or undo log.
+		 */
+		if ((UndoRecPtrGetBlockNum(urp) !=
+			 BufferGetBlockNumber(urec->uur_buffer)) ||
+			(prevrec_rnode->relNode != rnode->relNode))
+		{
+			ReleaseBuffer(urec->uur_buffer);
+			urec->uur_buffer = InvalidBuffer;
+		}
+	}
+	else
+	{
+		/*
+		 * If there is not a valid buffer in urec->uur_buffer that means we
+		 * had copied the payload data and tuple data so free them.
+		 */
+		if (urec->uur_payload.data)
+			pfree(urec->uur_payload.data);
+		if (urec->uur_tuple.data)
+			pfree(urec->uur_tuple.data);
+	}
+
+	/* Reset the urec before fetching the tuple */
+	urec->uur_tuple.data = NULL;
+	urec->uur_tuple.len = 0;
+	urec->uur_payload.data = NULL;
+	urec->uur_payload.len = 0;
+}
+
+/*
+ * Fetch the next undo record for given blkno, offset and transaction id (if
+ * valid).  The same tuple can be modified by multiple transactions, so during
+ * undo chain traversal sometimes we need to distinguish based on transaction
+ * id.  Callers that don't have any such requirement can pass
+ * InvalidTransactionId.
+ *
+ * Start the search from urp.  Caller need to call UndoRecordRelease to release the
+ * resources allocated by this function.
+ *
+ * urec_ptr_out is undo record pointer of the qualified undo record if valid
+ * pointer is passed.
+ *
+ * callback function decides whether particular undo record satisfies the
+ * condition of caller.
+ *
+ * Returns the required undo record if found, otherwise, return NULL which
+ * means either the record is already discarded or there is no such record
+ * in the undo chain.
+ */
+UnpackedUndoRecord *
+UndoFetchRecord(UndoRecPtr urp, BlockNumber blkno, OffsetNumber offset,
+				TransactionId xid, UndoRecPtr *urec_ptr_out,
+				SatisfyUndoRecordCallback callback)
+{
+	RelFileNode rnode,
+				prevrec_rnode = {0};
+	UnpackedUndoRecord *urec = NULL;
+	int			logno;
+
+	if (urec_ptr_out)
+		*urec_ptr_out = InvalidUndoRecPtr;
+
+	urec = palloc0(sizeof(UnpackedUndoRecord));
+	UndoRecPtrAssignRelFileNode(rnode, urp);
+
+	/* Find the undo record pointer we are interested in. */
+	while (true)
+	{
+		UndoLogControl *log;
+
+		logno = UndoRecPtrGetLogNo(urp);
+		log = UndoLogGet(logno, false);
+		if (log == NULL)
+		{
+			if (BufferIsValid(urec->uur_buffer))
+				ReleaseBuffer(urec->uur_buffer);
+			return NULL;
+		}
+
+		/*
+		 * Prevent UndoDiscardOneLog() from discarding data while we try to
+		 * read it.  Usually we would acquire log->mutex to read log->meta
+		 * members, but in this case we know that discard can't move without
+		 * also holding log->discard_lock.
+		 */
+		LWLockAcquire(&log->discard_lock, LW_SHARED);
+		if (!UndoRecordIsValid(log, urp))
+		{
+			if (BufferIsValid(urec->uur_buffer))
+				ReleaseBuffer(urec->uur_buffer);
+			return NULL;
+		}
+
+		/* Fetch the current undo record. */
+		urec = UndoGetOneRecord(urec, urp, rnode, log->meta.persistence);
+		LWLockRelease(&log->discard_lock);
+
+		if (blkno == InvalidBlockNumber)
+			break;
+
+		/* Check whether the undorecord satisfies conditions */
+		if (callback(urec, blkno, offset, xid))
+			break;
+
+		urp = urec->uur_blkprev;
+		prevrec_rnode = rnode;
+
+		/* Get rnode for the current undo record pointer. */
+		UndoRecPtrAssignRelFileNode(rnode, urp);
+
+		/* Reset the current undorecord before fetching the next. */
+		ResetUndoRecord(urec, urp, &rnode, &prevrec_rnode);
+	}
+
+	if (urec_ptr_out)
+		*urec_ptr_out = urp;
+	return urec;
+}
+
+/*
+ * Return the previous undo record pointer.
+ *
+ * This API can switch to the previous log if the current log is exhausted,
+ * so the caller shouldn't use it where that is not expected.
+ */
+UndoRecPtr
+UndoGetPrevUndoRecptr(UndoRecPtr urp, uint16 prevlen)
+{
+	UndoLogNumber logno = UndoRecPtrGetLogNo(urp);
+	UndoLogOffset offset = UndoRecPtrGetOffset(urp);
+
+	/*
+	 * We have reached to the first undo record of this undo log, so fetch the
+	 * previous undo record of the transaction from the previous log.
+	 */
+	if (offset == UndoLogBlockHeaderSize)
+	{
+		UndoLogControl *prevlog,
+				   *log;
+
+		log = UndoLogGet(logno, false);
+
+		Assert(log->meta.prevlogno != InvalidUndoLogNumber);
+
+		/* Fetch the previous log control. */
+		prevlog = UndoLogGet(log->meta.prevlogno, true);
+		logno = log->meta.prevlogno;
+		offset = prevlog->meta.insert;
+	}
+
+	/* calculate the previous undo record pointer */
+	return MakeUndoRecPtr(logno, offset - prevlen);
+}
+
+/*
+ * Release the resources allocated by UndoFetchRecord.
+ */
+void
+UndoRecordRelease(UnpackedUndoRecord *urec)
+{
+	/*
+	 * If the undo record has a valid buffer then just release the buffer
+	 * otherwise free the tuple and payload data.
+	 */
+	if (BufferIsValid(urec->uur_buffer))
+	{
+		ReleaseBuffer(urec->uur_buffer);
+	}
+	else
+	{
+		if (urec->uur_payload.data)
+			pfree(urec->uur_payload.data);
+		if (urec->uur_tuple.data)
+			pfree(urec->uur_tuple.data);
+	}
+
+	pfree(urec);
+}
+
+/*
+ * RegisterUndoLogBuffers - Register the undo buffers.
+ */
+void
+RegisterUndoLogBuffers(uint8 first_block_id)
+{
+	int			idx;
+	int			flags;
+
+	for (idx = 0; idx < buffer_idx; idx++)
+	{
+		flags = undo_buffer[idx].zero ? REGBUF_WILL_INIT : 0;
+		XLogRegisterBuffer(first_block_id + idx, undo_buffer[idx].buf, flags);
+	}
+}
+
+/*
+ * UndoLogBuffersSetLSN - Set LSN on undo page.
+*/
+void
+UndoLogBuffersSetLSN(XLogRecPtr recptr)
+{
+	int			idx;
+
+	for (idx = 0; idx < buffer_idx; idx++)
+		PageSetLSN(BufferGetPage(undo_buffer[idx].buf), recptr);
+}
+
+/*
+ * Reset the global variables related to undo buffers. This is required at the
+ * transaction abort and while releasing the undo buffers.
+ */
+void
+ResetUndoBuffers(void)
+{
+	int			i;
+
+	for (i = 0; i < buffer_idx; i++)
+	{
+		undo_buffer[i].blk = InvalidBlockNumber;
+		undo_buffer[i].buf = InvalidBuffer;
+	}
+
+	xact_urec_info.urecptr = InvalidUndoRecPtr;
+
+	/* Reset the prepared index. */
+	prepare_idx = 0;
+	buffer_idx = 0;
+	prepared_urec_ptr = InvalidUndoRecPtr;
+
+	/*
+	 * max_prepared_undo limit is changed so free the allocated memory and
+	 * reset all the variable back to their default value.
+	 */
+	if (max_prepared_undo > MAX_PREPARED_UNDO)
+	{
+		pfree(undo_buffer);
+		pfree(prepared_undo);
+		undo_buffer = def_buffers;
+		prepared_undo = def_prepared;
+		max_prepared_undo = MAX_PREPARED_UNDO;
+	}
+}
+
+/*
+ * Unlock and release the undo buffers.  This step must be performed after
+ * exiting any critical section where we have perfomed undo actions.
+ */
+void
+UnlockReleaseUndoBuffers(void)
+{
+	int			i;
+
+	for (i = 0; i < buffer_idx; i++)
+		UnlockReleaseBuffer(undo_buffer[i].buf);
+
+	ResetUndoBuffers();
+}
diff --git a/src/backend/access/undo/undorecord.c b/src/backend/access/undo/undorecord.c
new file mode 100644
index 0000000..b44c41d
--- /dev/null
+++ b/src/backend/access/undo/undorecord.c
@@ -0,0 +1,461 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.c
+ *	  encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undorecord.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/undorecord.h"
+#include "catalog/pg_tablespace.h"
+#include "storage/block.h"
+
+/* Workspace for InsertUndoRecord and UnpackUndoRecord. */
+static UndoRecordHeader work_hdr;
+static UndoRecordRelationDetails work_rd;
+static UndoRecordBlock work_blk;
+static UndoRecordTransaction work_txn;
+static UndoRecordPayload work_payload;
+
+/* Prototypes for static functions. */
+static bool InsertUndoBytes(char *sourceptr, int sourcelen,
+				char **writeptr, char *endptr,
+				int *my_bytes_written, int *total_bytes_written);
+static bool ReadUndoBytes(char *destptr, int readlen,
+			  char **readptr, char *endptr,
+			  int *my_bytes_read, int *total_bytes_read, bool nocopy);
+
+/*
+ * Compute and return the expected size of an undo record.
+ */
+Size
+UndoRecordExpectedSize(UnpackedUndoRecord *uur)
+{
+	Size		size;
+
+	size = SizeOfUndoRecordHeader;
+	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0)
+		size += SizeOfUndoRecordRelationDetails;
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+		size += SizeOfUndoRecordBlock;
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+		size += SizeOfUndoRecordTransaction;
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		size += SizeOfUndoRecordPayload;
+		size += uur->uur_payload.len;
+		size += uur->uur_tuple.len;
+	}
+
+	return size;
+}
+
+/*
+ * To insert an undo record, call InsertUndoRecord() repeatedly until it
+ * returns true.
+ *
+ * Insert as much of an undo record as will fit in the given page.
+ * starting_byte is the byte within the give page at which to begin writing,
+ * while *already_written is the number of bytes written to previous pages.
+ *
+ * Returns true if the remainder of the record was written and false if more
+ * bytes remain to be written; in either case, *already_written is set to the
+ * number of bytes written thus far.
+ *
+ * This function assumes that if *already_written is non-zero on entry, the
+ * same UnpackedUndoRecord is passed each time.  It also assumes that
+ * UnpackUndoRecord is not called between successive calls to InsertUndoRecord
+ * for the same UnpackedUndoRecord.
+ *
+ * If this function is called again to continue writing the record, the
+ * previous value for *already_written should be passed again, and
+ * starting_byte should be passed as sizeof(PageHeaderData) (since the record
+ * will continue immediately following the page header).
+ *
+ * This function sets uur->uur_info as a side effect.
+ */
+bool
+InsertUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_written, bool header_only)
+{
+	char	   *writeptr = (char *) page + starting_byte;
+	char	   *endptr = (char *) page + BLCKSZ;
+	int			my_bytes_written = *already_written;
+
+	/* The undo record must contain a valid information. */
+	Assert(uur->uur_info != 0);
+
+	/*
+	 * If this is the first call, copy the UnpackedUndoRecord into the
+	 * temporary variables of the types that will actually be stored in the
+	 * undo pages.  We just initialize everything here, on the assumption that
+	 * it's not worth adding branches to save a handful of assignments.
+	 */
+	if (*already_written == 0)
+	{
+		work_hdr.urec_type = uur->uur_type;
+		work_hdr.urec_info = uur->uur_info;
+		work_hdr.urec_prevlen = uur->uur_prevlen;
+		work_hdr.urec_reloid = uur->uur_reloid;
+		work_hdr.urec_prevxid = uur->uur_prevxid;
+		work_hdr.urec_xid = uur->uur_xid;
+		work_hdr.urec_cid = uur->uur_cid;
+		work_rd.urec_fork = uur->uur_fork;
+		work_blk.urec_blkprev = uur->uur_blkprev;
+		work_blk.urec_block = uur->uur_block;
+		work_blk.urec_offset = uur->uur_offset;
+		work_txn.urec_next = uur->uur_next;
+		work_txn.urec_xidepoch = uur->uur_xidepoch;
+		work_txn.urec_progress = uur->uur_progress;
+		work_txn.urec_dbid = uur->uur_dbid;
+		work_payload.urec_payload_len = uur->uur_payload.len;
+		work_payload.urec_tuple_len = uur->uur_tuple.len;
+	}
+	else
+	{
+		/*
+		 * We should have been passed the same record descriptor as before, or
+		 * caller has messed up.
+		 */
+		Assert(work_hdr.urec_type == uur->uur_type);
+		Assert(work_hdr.urec_info == uur->uur_info);
+		Assert(work_hdr.urec_prevlen == uur->uur_prevlen);
+		Assert(work_hdr.urec_reloid == uur->uur_reloid);
+		Assert(work_hdr.urec_prevxid == uur->uur_prevxid);
+		Assert(work_hdr.urec_xid == uur->uur_xid);
+		Assert(work_hdr.urec_cid == uur->uur_cid);
+		Assert(work_rd.urec_fork == uur->uur_fork);
+		Assert(work_blk.urec_blkprev == uur->uur_blkprev);
+		Assert(work_blk.urec_block == uur->uur_block);
+		Assert(work_blk.urec_offset == uur->uur_offset);
+		Assert(work_txn.urec_next == uur->uur_next);
+		Assert(work_txn.urec_progress == uur->uur_progress);
+		Assert(work_txn.urec_dbid == uur->uur_dbid);
+		Assert(work_payload.urec_payload_len == uur->uur_payload.len);
+		Assert(work_payload.urec_tuple_len == uur->uur_tuple.len);
+	}
+
+	/* Write header (if not already done). */
+	if (!InsertUndoBytes((char *) &work_hdr, SizeOfUndoRecordHeader,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Write relation details (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0 &&
+		!InsertUndoBytes((char *) &work_rd, SizeOfUndoRecordRelationDetails,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Write block information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0 &&
+		!InsertUndoBytes((char *) &work_blk, SizeOfUndoRecordBlock,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Write transaction information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0 &&
+		!InsertUndoBytes((char *) &work_txn, SizeOfUndoRecordTransaction,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	if (header_only)
+		return true;
+
+	/* Write payload information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		/* Payload header. */
+		if (!InsertUndoBytes((char *) &work_payload, SizeOfUndoRecordPayload,
+							 &writeptr, endptr,
+							 &my_bytes_written, already_written))
+			return false;
+
+		/* Payload bytes. */
+		if (uur->uur_payload.len > 0 &&
+			!InsertUndoBytes(uur->uur_payload.data, uur->uur_payload.len,
+							 &writeptr, endptr,
+							 &my_bytes_written, already_written))
+			return false;
+
+		/* Tuple bytes. */
+		if (uur->uur_tuple.len > 0 &&
+			!InsertUndoBytes(uur->uur_tuple.data, uur->uur_tuple.len,
+							 &writeptr, endptr,
+							 &my_bytes_written, already_written))
+			return false;
+	}
+
+	/* Hooray! */
+	return true;
+}
+
+/*
+ * Write undo bytes from a particular source, but only to the extent that
+ * they weren't written previously and will fit.
+ *
+ * 'sourceptr' points to the source data, and 'sourcelen' is the length of
+ * that data in bytes.
+ *
+ * 'writeptr' points to the insertion point for these bytes, and is updated
+ * for whatever we write.  The insertion point must not pass 'endptr', which
+ * represents the end of the buffer into which we are writing.
+ *
+ * 'my_bytes_written' is a pointer to the count of previous-written bytes
+ * from this and following structures in this undo record; that is, any
+ * bytes that are part of previous structures in the record have already
+ * been subtracted out.  We must update it for the bytes we write.
+ *
+ * 'total_bytes_written' points to the count of all previously-written bytes,
+ * and must likewise be updated for the bytes we write.
+ *
+ * The return value is false if we ran out of space before writing all
+ * the bytes, and otherwise true.
+ */
+static bool
+InsertUndoBytes(char *sourceptr, int sourcelen,
+				char **writeptr, char *endptr,
+				int *my_bytes_written, int *total_bytes_written)
+{
+	int			can_write;
+	int			remaining;
+
+	/*
+	 * If we've previously written all of these bytes, there's nothing to do
+	 * except update *my_bytes_written, which we must do to ensure that the
+	 * next call to this function gets the right starting value.
+	 */
+	if (*my_bytes_written >= sourcelen)
+	{
+		*my_bytes_written -= sourcelen;
+		return true;
+	}
+
+	/* Compute number of bytes we can write. */
+	remaining = sourcelen - *my_bytes_written;
+	can_write = Min(remaining, endptr - *writeptr);
+
+	/* Bail out if no bytes can be written. */
+	if (can_write == 0)
+		return false;
+
+	/* Copy the bytes we can write. */
+	memcpy(*writeptr, sourceptr + *my_bytes_written, can_write);
+
+	/* Update bookkeeeping infrormation. */
+	*writeptr += can_write;
+	*total_bytes_written += can_write;
+	*my_bytes_written = 0;
+
+	/* Return true only if we wrote the whole thing. */
+	return (can_write == remaining);
+}
+
+/*
+ * Call UnpackUndoRecord() one or more times to unpack an undo record.  For
+ * the first call, starting_byte should be set to the beginning of the undo
+ * record within the specified page, and *already_decoded should be set to 0;
+ * the function will update it based on the number of bytes decoded.  The
+ * return value is true if the entire record was unpacked and false if the
+ * record continues on the next page.  In the latter case, the function
+ * should be called again with the next page, passing starting_byte as the
+ * sizeof(PageHeaderData).
+ */
+bool
+UnpackUndoRecord(UnpackedUndoRecord *uur, Page page, int starting_byte,
+				 int *already_decoded, bool header_only)
+{
+	char	   *readptr = (char *) page + starting_byte;
+	char	   *endptr = (char *) page + BLCKSZ;
+	int			my_bytes_decoded = *already_decoded;
+	bool		is_undo_splited = (my_bytes_decoded > 0) ? true : false;
+
+	/* Decode header (if not already done). */
+	if (!ReadUndoBytes((char *) &work_hdr, SizeOfUndoRecordHeader,
+					   &readptr, endptr,
+					   &my_bytes_decoded, already_decoded, false))
+		return false;
+
+	uur->uur_type = work_hdr.urec_type;
+	uur->uur_info = work_hdr.urec_info;
+	uur->uur_prevlen = work_hdr.urec_prevlen;
+	uur->uur_reloid = work_hdr.urec_reloid;
+	uur->uur_prevxid = work_hdr.urec_prevxid;
+	uur->uur_xid = work_hdr.urec_xid;
+	uur->uur_cid = work_hdr.urec_cid;
+
+	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0)
+	{
+		/* Decode header (if not already done). */
+		if (!ReadUndoBytes((char *) &work_rd, SizeOfUndoRecordRelationDetails,
+						   &readptr, endptr,
+						   &my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_fork = work_rd.urec_fork;
+	}
+
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+	{
+		if (!ReadUndoBytes((char *) &work_blk, SizeOfUndoRecordBlock,
+						   &readptr, endptr,
+						   &my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_blkprev = work_blk.urec_blkprev;
+		uur->uur_block = work_blk.urec_block;
+		uur->uur_offset = work_blk.urec_offset;
+	}
+
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+	{
+		if (!ReadUndoBytes((char *) &work_txn, SizeOfUndoRecordTransaction,
+						   &readptr, endptr,
+						   &my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_next = work_txn.urec_next;
+		uur->uur_xidepoch = work_txn.urec_xidepoch;
+		uur->uur_progress = work_txn.urec_progress;
+		uur->uur_dbid = work_txn.urec_dbid;
+	}
+
+	if (header_only)
+		return true;
+
+	/* Read payload information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		if (!ReadUndoBytes((char *) &work_payload, SizeOfUndoRecordPayload,
+						   &readptr, endptr,
+						   &my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_payload.len = work_payload.urec_payload_len;
+		uur->uur_tuple.len = work_payload.urec_tuple_len;
+
+		/*
+		 * If we can read the complete record from a single page then just
+		 * point payload data and tuple data into the page otherwise allocate
+		 * the memory.
+		 *
+		 * XXX There is possibility of optimization that instead of always
+		 * allocating the memory whenever tuple is split we can check if any
+		 * of the payload or tuple data falling into the same page then don't
+		 * allocate the memory for that.
+		 */
+		if (!is_undo_splited &&
+			uur->uur_payload.len + uur->uur_tuple.len <= (endptr - readptr))
+		{
+			uur->uur_payload.data = readptr;
+			readptr += uur->uur_payload.len;
+
+			uur->uur_tuple.data = readptr;
+		}
+		else
+		{
+			if (uur->uur_payload.len > 0 && uur->uur_payload.data == NULL)
+				uur->uur_payload.data = (char *) palloc0(uur->uur_payload.len);
+
+			if (uur->uur_tuple.len > 0 && uur->uur_tuple.data == NULL)
+				uur->uur_tuple.data = (char *) palloc0(uur->uur_tuple.len);
+
+			if (!ReadUndoBytes((char *) uur->uur_payload.data,
+							   uur->uur_payload.len, &readptr, endptr,
+							   &my_bytes_decoded, already_decoded, false))
+				return false;
+
+			if (!ReadUndoBytes((char *) uur->uur_tuple.data,
+							   uur->uur_tuple.len, &readptr, endptr,
+							   &my_bytes_decoded, already_decoded, false))
+				return false;
+		}
+	}
+
+	return true;
+}
+
+/*
+ * Read undo bytes into a particular destination,
+ *
+ * 'destptr' points to the source data, and 'readlen' is the length of
+ * that data to be read in bytes.
+ *
+ * 'readptr' points to the read point for these bytes, and is updated
+ * for how much we read.  The read point must not pass 'endptr', which
+ * represents the end of the buffer from which we are reading.
+ *
+ * 'my_bytes_read' is a pointer to the count of previous-read bytes
+ * from this and following structures in this undo record; that is, any
+ * bytes that are part of previous structures in the record have already
+ * been subtracted out.  We must update it for the bytes we read.
+ *
+ * 'total_bytes_read' points to the count of all previously-read bytes,
+ * and must likewise be updated for the bytes we read.
+ *
+ * nocopy if this flag is set true then it will just skip the readlen
+ * size in undo but it will not copy into the buffer.
+ *
+ * The return value is false if we ran out of space before read all
+ * the bytes, and otherwise true.
+ */
+static bool
+ReadUndoBytes(char *destptr, int readlen, char **readptr, char *endptr,
+			  int *my_bytes_read, int *total_bytes_read, bool nocopy)
+{
+	int			can_read;
+	int			remaining;
+
+	if (*my_bytes_read >= readlen)
+	{
+		*my_bytes_read -= readlen;
+		return true;
+	}
+
+	/* Compute number of bytes we can read. */
+	remaining = readlen - *my_bytes_read;
+	can_read = Min(remaining, endptr - *readptr);
+
+	/* Bail out if no bytes can be read. */
+	if (can_read == 0)
+		return false;
+
+	/* Copy the bytes we can read. */
+	if (!nocopy)
+		memcpy(destptr + *my_bytes_read, *readptr, can_read);
+
+	/* Update bookkeeping information. */
+	*readptr += can_read;
+	*total_bytes_read += can_read;
+	*my_bytes_read = 0;
+
+	/* Return true only if we wrote the whole thing. */
+	return (can_read == remaining);
+}
+
+/*
+ * Set uur_info for an UnpackedUndoRecord appropriately based on which
+ * other fields are set.
+ */
+void
+UndoRecordSetInfo(UnpackedUndoRecord *uur)
+{
+	if (uur->uur_fork != MAIN_FORKNUM)
+		uur->uur_info |= UREC_INFO_RELATION_DETAILS;
+	if (uur->uur_block != InvalidBlockNumber)
+		uur->uur_info |= UREC_INFO_BLOCK;
+	if (uur->uur_next != InvalidUndoRecPtr)
+		uur->uur_info |= UREC_INFO_TRANSACTION;
+	if (uur->uur_payload.len || uur->uur_tuple.len)
+		uur->uur_info |= UREC_INFO_PAYLOAD;
+}
diff --git a/src/include/access/undoinsert.h b/src/include/access/undoinsert.h
new file mode 100644
index 0000000..2256dbe
--- /dev/null
+++ b/src/include/access/undoinsert.h
@@ -0,0 +1,50 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoinsert.h
+ *	  entry points for inserting undo records
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undoinsert.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOINSERT_H
+#define UNDOINSERT_H
+
+#include "access/undolog.h"
+#include "access/undorecord.h"
+#include "access/xlogdefs.h"
+#include "catalog/pg_class.h"
+
+/*
+ * Typedef for callback function for UndoFetchRecord.
+ *
+ * This checks whether an undorecord satisfies the given conditions.
+ */
+typedef bool (*SatisfyUndoRecordCallback) (UnpackedUndoRecord *urec,
+										   BlockNumber blkno,
+										   OffsetNumber offset,
+										   TransactionId xid);
+
+extern UndoRecPtr PrepareUndoInsert(UnpackedUndoRecord *, TransactionId xid,
+				  UndoPersistence);
+extern void InsertPreparedUndo(void);
+
+extern void RegisterUndoLogBuffers(uint8 first_block_id);
+extern void UndoLogBuffersSetLSN(XLogRecPtr recptr);
+extern void UnlockReleaseUndoBuffers(void);
+
+extern UnpackedUndoRecord *UndoFetchRecord(UndoRecPtr urp,
+				BlockNumber blkno, OffsetNumber offset,
+				TransactionId xid, UndoRecPtr *urec_ptr_out,
+				SatisfyUndoRecordCallback callback);
+extern void UndoRecordRelease(UnpackedUndoRecord *urec);
+extern void UndoRecordSetPrevUndoLen(uint16 len);
+extern void UndoSetPrepareSize(UnpackedUndoRecord *undorecords, int nrecords,
+				   TransactionId xid, UndoPersistence upersistence);
+extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, uint16 prevlen);
+extern void ResetUndoBuffers(void);
+
+#endif							/* UNDOINSERT_H */
diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h
new file mode 100644
index 0000000..ab4dd4c
--- /dev/null
+++ b/src/include/access/undorecord.h
@@ -0,0 +1,187 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.h
+ *	  encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undorecord.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDORECORD_H
+#define UNDORECORD_H
+
+#include "access/undolog.h"
+#include "lib/stringinfo.h"
+#include "storage/block.h"
+#include "storage/bufpage.h"
+#include "storage/buf.h"
+#include "storage/off.h"
+
+
+/*
+ * Every undo record begins with an UndoRecordHeader structure, which is
+ * followed by the additional structures indicated by the contents of
+ * urec_info.  All structures are packed into the alignment without padding
+ * bytes, and the undo record itself need not be aligned either, so care
+ * must be taken when reading the header.
+ */
+typedef struct UndoRecordHeader
+{
+	uint8		urec_type;		/* record type code */
+	uint8		urec_info;		/* flag bits */
+	uint16		urec_prevlen;	/* length of previous record in bytes */
+	Oid			urec_reloid;	/* relation OID */
+
+	/*
+	 * Transaction id that has modified the tuple present in this undo record.
+	 * If this is older than oldestXidWithEpochHavingUndo, then we can consider
+	 * the tuple in this undo record as visible.
+	 */
+	TransactionId urec_prevxid;
+
+	/*
+	 * Transaction id that has modified the tuple for which this undo record
+	 * is written.  We use this to skip the undo records.  See comments atop
+	 * function UndoFetchRecord.
+	 */
+	TransactionId urec_xid;		/* Transaction id */
+	CommandId	urec_cid;		/* command id */
+} UndoRecordHeader;
+
+#define SizeOfUndoRecordHeader	\
+	(offsetof(UndoRecordHeader, urec_cid) + sizeof(CommandId))
+
+/*
+ * If UREC_INFO_RELATION_DETAILS is set, an UndoRecordRelationDetails structure
+ * follows.
+ *
+ * If UREC_INFO_BLOCK is set, an UndoRecordBlock structure follows.
+ *
+ * If UREC_INFO_TRANSACTION is set, an UndoRecordTransaction structure
+ * follows.
+ *
+ * If UREC_INFO_PAYLOAD is set, an UndoRecordPayload structure follows.
+ *
+ * When (as will often be the case) multiple structures are present, they
+ * appear in the same order in which the constants are defined here.  That is,
+ * UndoRecordRelationDetails appears first.
+ */
+#define UREC_INFO_RELATION_DETAILS			0x01
+#define UREC_INFO_BLOCK						0x02
+#define UREC_INFO_PAYLOAD					0x04
+#define UREC_INFO_TRANSACTION				0x08
+
+/*
+ * Additional information about a relation to which this record pertains,
+ * namely the fork number.  If the fork number is MAIN_FORKNUM, this structure
+ * can (and should) be omitted.
+ */
+typedef struct UndoRecordRelationDetails
+{
+	ForkNumber	urec_fork;		/* fork number */
+} UndoRecordRelationDetails;
+
+#define SizeOfUndoRecordRelationDetails \
+	(offsetof(UndoRecordRelationDetails, urec_fork) + sizeof(uint8))
+
+/*
+ * Identifying information for a block to which this record pertains, and
+ * a pointer to the previous record for the same block.
+ */
+typedef struct UndoRecordBlock
+{
+	uint64		urec_blkprev;	/* byte offset of previous undo for block */
+	BlockNumber urec_block;		/* block number */
+	OffsetNumber urec_offset;	/* offset number */
+} UndoRecordBlock;
+
+#define SizeOfUndoRecordBlock \
+	(offsetof(UndoRecordBlock, urec_offset) + sizeof(OffsetNumber))
+
+/*
+ * Identifying information for a transaction to which this undo belongs.  This
+ * also stores the dbid and the progress of the undo apply during rollback.
+ */
+typedef struct UndoRecordTransaction
+{
+	/*
+	 * This indicates undo action apply progress, 0 means not started, 1 means
+	 * completed.  In future, it can also be used to show the progress of how
+	 * much undo has been applied so far with some formula.
+	 */
+	uint32		urec_progress;
+	uint32		urec_xidepoch;	/* epoch of the current transaction */
+	Oid			urec_dbid;		/* database id */
+	uint64		urec_next;		/* urec pointer of the next transaction */
+} UndoRecordTransaction;
+
+#define SizeOfUrecNext (sizeof(UndoRecPtr))
+#define SizeOfUndoRecordTransaction \
+	(offsetof(UndoRecordTransaction, urec_next) + SizeOfUrecNext)
+
+/*
+ * Information about the amount of payload data and tuple data present
+ * in this record.  The payload bytes immediately follow the structures
+ * specified by flag bits in urec_info, and the tuple bytes follow the
+ * payload bytes.
+ */
+typedef struct UndoRecordPayload
+{
+	uint16		urec_payload_len;	/* # of payload bytes */
+	uint16		urec_tuple_len; /* # of tuple bytes */
+} UndoRecordPayload;
+
+#define SizeOfUndoRecordPayload \
+	(offsetof(UndoRecordPayload, urec_tuple_len) + sizeof(uint16))
+
+/*
+ * Information that can be used to create an undo record or that can be
+ * extracted from one previously created.  The raw undo record format is
+ * difficult to manage, so this structure provides a convenient intermediate
+ * form that is easier for callers to manage.
+ *
+ * When creating an undo record from an UnpackedUndoRecord, caller should
+ * set uur_info to 0.  It will be initialized by the first call to
+ * UndoRecordSetInfo or InsertUndoRecord.  We do set it in
+ * UndoRecordAllocate for transaction specific header information.
+ *
+ * When an undo record is decoded into an UnpackedUndoRecord, all fields
+ * will be initialized, but those for which no information is available
+ * will be set to invalid or default values, as appropriate.
+ */
+typedef struct UnpackedUndoRecord
+{
+	uint8		uur_type;		/* record type code */
+	uint8		uur_info;		/* flag bits */
+	uint16		uur_prevlen;	/* length of previous record */
+	Oid			uur_reloid;		/* relation OID */
+	TransactionId uur_prevxid;	/* transaction id */
+	TransactionId uur_xid;		/* transaction id */
+	CommandId	uur_cid;		/* command id */
+	ForkNumber	uur_fork;		/* fork number */
+	uint64		uur_blkprev;	/* byte offset of previous undo for block */
+	BlockNumber uur_block;		/* block number */
+	OffsetNumber uur_offset;	/* offset number */
+	Buffer		uur_buffer;		/* buffer in which undo record data points */
+	uint32		uur_xidepoch;	/* epoch of the inserting transaction. */
+	uint64		uur_next;		/* urec pointer of the next transaction */
+	Oid			uur_dbid;		/* database id */
+
+	/* undo applying progress, see detail comment in UndoRecordTransaction*/
+	uint32		uur_progress;
+	StringInfoData uur_payload; /* payload bytes */
+	StringInfoData uur_tuple;	/* tuple bytes */
+} UnpackedUndoRecord;
+
+
+extern void UndoRecordSetInfo(UnpackedUndoRecord *uur);
+extern Size UndoRecordExpectedSize(UnpackedUndoRecord *uur);
+extern bool InsertUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_written, bool header_only);
+extern bool UnpackUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_decoded, bool header_only);
+
+#endif							/* UNDORECORD_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 689c57c..73394c5 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -14,6 +14,7 @@
 #ifndef XACT_H
 #define XACT_H
 
+#include "access/undolog.h"
 #include "access/xlogreader.h"
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
@@ -430,5 +431,6 @@ extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_ab
 extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
+extern void SetCurrentUndoLocation(UndoRecPtr urec_ptr);
 
 #endif							/* XACT_H */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index f3a7ba4..d4e742f 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -310,6 +310,7 @@ extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
+extern uint32 GetEpochForXid(TransactionId xid);
 extern void RemovePromoteSignalFiles(void);
 
 extern bool CheckPromoteSignal(void);
-- 
1.8.3.1

