From 1718d0e3666884d533abc9519bf7df597b980a52 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilip.kumar@enterprisedb.com>
Date: Wed, 14 Nov 2018 01:17:03 -0800
Subject: [PATCH] undo-interface-v3

Provide an interface for prepare, insert, or fetch the undo
records. This layer will use undo-log-storage to reserve the space for
the undo records and buffer management routine to write and read the
undo records.

Dilip Kumar with help from Rafia Sabih based on an early prototype
for forming undo record by Robert Haas and design inputs from Amit Kapila
---
 src/backend/access/transam/xact.c    |   24 +
 src/backend/access/transam/xlog.c    |   30 +
 src/backend/access/undo/Makefile     |    2 +-
 src/backend/access/undo/undoinsert.c | 1172 ++++++++++++++++++++++++++++++++++
 src/backend/access/undo/undorecord.c |  459 +++++++++++++
 src/include/access/undoinsert.h      |  106 +++
 src/include/access/undorecord.h      |  216 +++++++
 src/include/access/xact.h            |    2 +
 src/include/access/xlog.h            |    1 +
 9 files changed, 2011 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/access/undo/undoinsert.c
 create mode 100644 src/backend/access/undo/undorecord.c
 create mode 100644 src/include/access/undoinsert.h
 create mode 100644 src/include/access/undorecord.h

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index a979d7e..6b7f7fa 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -189,6 +189,10 @@ typedef struct TransactionStateData
 	bool		startedInRecovery;	/* did we start in recovery? */
 	bool		didLogXid;		/* has xid been included in WAL record? */
 	int			parallelModeLevel;	/* Enter/ExitParallelMode counter */
+
+	 /* start and end undo record location for each persistence level */
+	UndoRecPtr	start_urec_ptr[UndoPersistenceLevels];
+	UndoRecPtr	latest_urec_ptr[UndoPersistenceLevels];
 	struct TransactionStateData *parent;	/* back link to parent */
 } TransactionStateData;
 
@@ -912,6 +916,26 @@ IsInParallelMode(void)
 }
 
 /*
+ * SetCurrentUndoLocation
+ */
+void
+SetCurrentUndoLocation(UndoRecPtr urec_ptr)
+{
+	UndoLogControl *log = UndoLogGet(UndoRecPtrGetLogNo(urec_ptr), false);
+	UndoPersistence upersistence = log->meta.persistence;
+
+	Assert(AmAttachedToUndoLog(log) || InRecovery);
+	/*
+	 * Set the start undo record pointer for first undo record in a
+	 * subtransaction.
+	 */
+	if (!UndoRecPtrIsValid(CurrentTransactionState->start_urec_ptr[upersistence]))
+		CurrentTransactionState->start_urec_ptr[upersistence] = urec_ptr;
+	CurrentTransactionState->latest_urec_ptr[upersistence] = urec_ptr;
+
+}
+
+/*
  *	CommandCounterIncrement
  */
 void
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index dce4c01..36c161e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -8511,6 +8511,36 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch)
 }
 
 /*
+ * GetEpochForXid - get the epoch associated with the xid
+ */
+uint32
+GetEpochForXid(TransactionId xid)
+{
+	uint32		ckptXidEpoch;
+	TransactionId ckptXid;
+
+	SpinLockAcquire(&XLogCtl->info_lck);
+	ckptXidEpoch = XLogCtl->ckptXidEpoch;
+	ckptXid = XLogCtl->ckptXid;
+	SpinLockRelease(&XLogCtl->info_lck);
+
+	/*
+	 * Xid can be on either side when near wrap-around.  Xid is certainly
+	 * logically later than ckptXid.  So if it's numerically less, it must
+	 * have wrapped into the next epoch.  OTOH, if it is numerically more,
+	 * but logically lesser, then it belongs to previous epoch.
+	 */
+	if (xid > ckptXid &&
+		TransactionIdPrecedes(xid, ckptXid))
+		ckptXidEpoch--;
+	else if (xid < ckptXid &&
+			 TransactionIdFollows(xid, ckptXid))
+		ckptXidEpoch++;
+
+	return ckptXidEpoch;
+}
+
+/*
  * This must be called ONCE during postmaster or standalone-backend shutdown
  */
 void
diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile
index 219c696..f41e8f7 100644
--- a/src/backend/access/undo/Makefile
+++ b/src/backend/access/undo/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undolog.o
+OBJS = undoinsert.o undolog.o undorecord.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/undo/undoinsert.c b/src/backend/access/undo/undoinsert.c
new file mode 100644
index 0000000..935b3ad
--- /dev/null
+++ b/src/backend/access/undo/undoinsert.c
@@ -0,0 +1,1172 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoinsert.c
+ *	  entry points for inserting undo records
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undoinsert.c
+ *
+ * NOTES:
+ * Undo record layout:
+ *
+ *  Undo record are stored in sequential order in the undo log.  And, each
+ *  transaction's first undo record a.k.a. transaction header points to the next
+ *  transaction's start header.  Transaction headers are linked so that the
+ *  discard worker can read undo log transaction by transaction and avoid
+ *  reading each undo record.
+ *
+ * Handling multi log:
+ *
+ *  It is possible that the undo record of a transaction can be spread across
+ *  multiple undo log.  And, we need some special handling while inserting the
+ *  undo for discard and rollback to work sanely.
+ *
+ *  If the undorecord goes to next log then we insert a transaction header for
+ *  the first record in the new log and update the transaction header with this
+ *  new log's location. This will allow us to connect transactions across logs
+ *  when the same transaction span across log (for this we keep track of the
+ *  previous logno in undo log meta) which is required to find the latest undo
+ *  record pointer of the aborted transaction for executing the undo actions
+ *  before discard. If the next log get processed first in that case we
+ *  don't need to trace back the actual start pointer of the transaction,
+ *  in such case we can only execute the undo actions from the current log
+ *  because the undo pointer in the slot will be rewound and that will be enough
+ *  to avoid executing same actions.  However, there is possibility that after
+ *  executing the undo actions the undo pointer got discarded, now in later
+ *  stage while processing the previous log it might try to fetch the undo
+ *  record in the discarded log while chasing the transaction header chain.
+ *  To avoid this situation we first check if the next_urec of the transaction
+ *  is already discarded then no need to access that and start executing from
+ *  the last undo record in the current log.
+ *
+ *  We only connect to next log if the same transaction spread to next log
+ *  otherwise don't.
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/undorecord.h"
+#include "access/undoinsert.h"
+#include "catalog/pg_tablespace.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/buf_internals.h"
+#include "storage/bufmgr.h"
+#include "miscadmin.h"
+#include "commands/tablecmds.h"
+
+/*
+ * XXX Do we want to support undo tuple size which is more than the BLCKSZ
+ * if not than undo record can spread across 2 buffers at the max.
+ */
+#define MAX_BUFFER_PER_UNDO    2
+
+/*
+ * This defines the number of undo records that can be prepared before
+ * calling insert by default.  If you need to prepare more than
+ * MAX_PREPARED_UNDO undo records, then you must call UndoSetPrepareSize
+ * first.
+ */
+#define MAX_PREPARED_UNDO 2
+
+/*
+ * Consider buffers needed for updating previous transaction's
+ * starting undo record. Hence increased by 1.
+ */
+#define MAX_UNDO_BUFFERS       (MAX_PREPARED_UNDO + 1) * MAX_BUFFER_PER_UNDO
+
+/*
+ * Previous top transaction id which inserted the undo.  Whenever a new main
+ * transaction try to prepare an undo record we will check if its txid not the
+ * same as prev_txid then we will insert the start undo record.
+ */
+static TransactionId	prev_txid[UndoPersistenceLevels] = { 0 };
+
+/* Undo block number to buffer mapping. */
+typedef struct UndoBuffers
+{
+	UndoLogNumber	logno;			/* Undo log number */
+	BlockNumber		blk;			/* block number */
+	Buffer			buf;			/* buffer allocated for the block */
+	bool			zero;			/* new block full of zeroes */
+} UndoBuffers;
+
+static UndoBuffers def_buffers[MAX_UNDO_BUFFERS];
+static int	buffer_idx;
+
+/*
+ * Structure to hold the prepared undo information.
+ */
+typedef struct PreparedUndoSpace
+{
+	UndoRecPtr urp;						/* undo record pointer */
+	UnpackedUndoRecord *urec;			/* undo record */
+	int undo_buffer_idx[MAX_BUFFER_PER_UNDO]; /* undo_buffer array index */
+} PreparedUndoSpace;
+
+static PreparedUndoSpace  def_prepared[MAX_PREPARED_UNDO];
+static int prepare_idx;
+static int	max_prepare_undo = MAX_PREPARED_UNDO;
+static UndoRecPtr	multi_prep_urp = InvalidUndoRecPtr;
+static bool	update_prev_header = false;
+
+/*
+ * By default prepared_undo and undo_buffer points to the static memory.
+ * In case caller wants to support more than default max_prepared undo records
+ * then the limit can be increased by calling UndoSetPrepareSize function.
+ * Therein, dynamic memory will be allocated and prepared_undo and undo_buffer
+ * will start pointing to newly allocated memory, which will be released by
+ * UnlockReleaseUndoBuffers and these variables will again set back to their
+ * default values.
+ */
+static PreparedUndoSpace *prepared_undo = def_prepared;
+static UndoBuffers *undo_buffer = def_buffers;
+
+/*
+ * Structure to hold the previous transaction's undo update information.
+ */
+typedef struct PreviousTxnUndoRecord
+{
+	UndoRecPtr	prev_urecptr; /* prev txn's starting urecptr */
+	int			prev_txn_undo_buffers[MAX_BUFFER_PER_UNDO];
+	UnpackedUndoRecord uur;	/* prev txn's first undo record. */
+} PreviousTxnInfo;
+
+static PreviousTxnInfo prev_txn_info;
+
+/* Prototypes for static functions. */
+static UnpackedUndoRecord* UndoGetOneRecord(UnpackedUndoRecord *urec,
+											UndoRecPtr urp, RelFileNode rnode,
+											UndoPersistence persistence);
+static void UndoRecordPrepareTransInfo(UndoRecPtr urecptr,
+											 bool log_switched);
+static void UndoRecordUpdateTransInfo(void);
+static int InsertFindBufferSlot(RelFileNode rnode, BlockNumber blk,
+								ReadBufferMode rbm,
+								UndoPersistence persistence);
+static bool UndoRecordIsValid(UndoLogControl *log,
+							  UndoRecPtr prev_xact_urp);
+
+/*
+ * Check whether the undo record is discarded or not.  If it's already discarded
+ * return false otherwise return true.
+ *
+ * Caller must hold lock on log->discard_lock.  This function will release the
+ * lock if return false otherwise lock will be held on return and the caller
+ * need to release it.
+ */
+static bool
+UndoRecordIsValid(UndoLogControl *log, UndoRecPtr prev_xact_urp)
+{
+	Assert(LWLockHeldByMeInMode(&log->discard_lock, LW_SHARED));
+
+	if (log->oldest_data == InvalidUndoRecPtr)
+	{
+		/*
+		 * oldest_data is only initialized when the DiscardWorker first time
+		 * attempts to discard undo logs so we can not rely on this value to
+		 * identify whether the undo record pointer is already discarded or not
+		 * so we can check it by calling undo log routine.  If its not yet
+		 * discarded then we have to reacquire the log->discard_lock so that the
+		 * doesn't get discarded concurrently.
+		 */
+		LWLockRelease(&log->discard_lock);
+		if (UndoLogIsDiscarded(prev_xact_urp))
+			return false;
+		LWLockAcquire(&log->discard_lock, LW_SHARED);
+	}
+
+	/* Check again if it's already discarded. */
+	if (prev_xact_urp < log->oldest_data)
+	{
+		LWLockRelease(&log->discard_lock);
+		return false;
+	}
+
+	return true;
+}
+
+/*
+ * Prepare to update the previous transaction's next undo pointer to maintain
+ * the transaction chain in the undo.  This will read the header of the first
+ * undo record of the previous transaction and lock the necessary buffers.
+ * The actual update will be done by UndoRecordUpdateTransInfo under the
+ * critical section.
+ */
+static void
+UndoRecordPrepareTransInfo(UndoRecPtr urecptr, bool log_switched)
+{
+	UndoRecPtr	prev_xact_urp;
+	Buffer		buffer = InvalidBuffer;
+	BlockNumber	cur_blk;
+	RelFileNode	rnode;
+	UndoLogNumber logno = UndoRecPtrGetLogNo(urecptr);
+	UndoLogControl *log;
+	Page		page;
+	int			already_decoded = 0;
+	int			starting_byte;
+	int			bufidx;
+	int			index = 0;
+
+	log = UndoLogGet(logno, false);
+
+	if (log_switched)
+	{
+		Assert(log->meta.prevlogno != InvalidUndoLogNumber);
+		log = UndoLogGet(log->meta.prevlogno, false);
+	}
+
+	/*
+	 * Temporary undo logs are discarded on transaction commit so we don't
+	 * need to do anything.
+	 */
+	if (log->meta.persistence == UNDO_TEMP)
+		return;
+
+	/*
+	 * We can read the previous transaction's location without locking,
+	 * because only the backend attached to the log can write to it (or we're
+	 * in recovery).
+	 */
+	Assert(AmAttachedToUndoLog(log) || InRecovery || log_switched);
+
+	if (log->meta.last_xact_start == 0)
+		prev_xact_urp = InvalidUndoRecPtr;
+	else
+		prev_xact_urp = MakeUndoRecPtr(log->logno, log->meta.last_xact_start);
+
+	/*
+	 * The absence of previous transaction's undo indicate that this backend
+	 * is preparing its first undo in which case we have nothing to update.
+	 */
+	if (!UndoRecPtrIsValid(prev_xact_urp))
+		return;
+
+	/*
+	 * Acquire the discard lock before accessing the undo record so that
+	 * discard worker doesn't remove the record while we are in process of
+	 * reading it.
+	 */
+	LWLockAcquire(&log->discard_lock, LW_SHARED);
+
+	if (!UndoRecordIsValid(log, prev_xact_urp))
+		return;
+
+	UndoRecPtrAssignRelFileNode(rnode, prev_xact_urp);
+	cur_blk = UndoRecPtrGetBlockNum(prev_xact_urp);
+	starting_byte = UndoRecPtrGetPageOffset(prev_xact_urp);
+
+	while (true)
+	{
+		bufidx = InsertFindBufferSlot(rnode, cur_blk,
+									  RBM_NORMAL,
+									  log->meta.persistence);
+		prev_txn_info.prev_txn_undo_buffers[index] = bufidx;
+		buffer = undo_buffer[bufidx].buf;
+		page = BufferGetPage(buffer);
+		index++;
+
+		if (UnpackUndoRecord(&prev_txn_info.uur, page, starting_byte,
+							 &already_decoded, true))
+			break;
+
+		starting_byte = UndoLogBlockHeaderSize;
+		cur_blk++;
+	}
+
+	prev_txn_info.uur.uur_next = urecptr;
+	prev_txn_info.prev_urecptr = prev_xact_urp;
+	LWLockRelease(&log->discard_lock);
+}
+
+/*
+ * Overwrite the first undo record of the previous transaction to update its
+ * next pointer.  This will just insert the already prepared record by
+ * UndoRecordPrepareTransInfo.  This must be called under the critical section.
+ * This will just overwrite the undo header not the data.
+ */
+static void
+UndoRecordUpdateTransInfo(void)
+{
+	UndoLogNumber logno = UndoRecPtrGetLogNo(prev_txn_info.prev_urecptr);
+	Page		page;
+	int			starting_byte;
+	int			already_written = 0;
+	int			idx = 0;
+	UndoRecPtr	prev_urp = InvalidUndoRecPtr;
+	UndoLogControl *log;
+
+	log = UndoLogGet(logno, false);
+	prev_urp = prev_txn_info.prev_urecptr;
+
+	/*
+	 * Acquire the discard lock before accessing the undo record so that
+	 * discard worker can't remove the record while we are in process of
+	 * reading it.
+	 */
+	LWLockAcquire(&log->discard_lock, LW_SHARED);
+
+	if (!UndoRecordIsValid(log, prev_urp))
+		return;
+
+	/*
+	 * Update the next transactions start urecptr in the transaction
+	 * header.
+	 */
+	starting_byte = UndoRecPtrGetPageOffset(prev_urp);
+
+	do
+	{
+		Buffer  buffer;
+		int		buf_idx;
+
+		buf_idx = prev_txn_info.prev_txn_undo_buffers[idx];
+		buffer = undo_buffer[buf_idx].buf;
+		page = BufferGetPage(buffer);
+
+		/* Overwrite the previously written undo. */
+		if (InsertUndoRecord(&prev_txn_info.uur, page, starting_byte, &already_written, true))
+		{
+			MarkBufferDirty(buffer);
+			break;
+		}
+
+		MarkBufferDirty(buffer);
+		starting_byte = UndoLogBlockHeaderSize;
+		idx++;
+
+		Assert(idx < MAX_BUFFER_PER_UNDO);
+	} while(true);
+
+	LWLockRelease(&log->discard_lock);
+}
+
+/*
+ * Find the block number in undo buffer array, if it's present then just return
+ * its index otherwise search the buffer and insert an entry and lock the buffer
+ * in exclusive mode.
+ *
+ * Undo log insertions are append-only.  If the caller is writing new data
+ * that begins exactly at the beginning of a page, then there cannot be any
+ * useful data after that point.  In that case RBM_ZERO can be passed in as
+ * rbm so that we can skip a useless read of a disk block.  In all other
+ * cases, RBM_NORMAL should be passed in, to read the page in if it doesn't
+ * happen to be already in the buffer pool.
+ */
+static int
+InsertFindBufferSlot(RelFileNode rnode,
+					 BlockNumber blk,
+					 ReadBufferMode rbm,
+					 UndoPersistence persistence)
+{
+	int 	i;
+	Buffer 	buffer;
+
+	/* Don't do anything, if we already have a buffer pinned for the block. */
+	for (i = 0; i < buffer_idx; i++)
+	{
+		/*
+		 * It's not enough to just compare the block number because the
+		 * undo_buffer might holds the undo from different undo logs (e.g
+		 * when previous transaction start header is in previous undo log)
+		 * so compare (logno + blkno).
+		 */
+		if ((blk == undo_buffer[i].blk) &&
+			(undo_buffer[i].logno == rnode.relNode))
+		{
+			/* caller must hold exclusive lock on buffer */
+			Assert(BufferIsLocal(undo_buffer[i].buf) ||
+				   LWLockHeldByMeInMode(BufferDescriptorGetContentLock(
+										GetBufferDescriptor(undo_buffer[i].buf - 1)),
+										LW_EXCLUSIVE));
+			break;
+		}
+	}
+
+	/*
+	 * We did not find the block so allocate the buffer and insert into the
+	 * undo buffer array
+	 */
+	if (i == buffer_idx)
+	{
+		/*
+		 * Fetch the buffer in which we want to insert the undo record.
+		 */
+		buffer = ReadBufferWithoutRelcache(rnode,
+										   UndoLogForkNum,
+										   blk,
+										   rbm,
+										   NULL,
+										   RelPersistenceForUndoPersistence(persistence));
+
+		/* Lock the buffer */
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+
+		undo_buffer[buffer_idx].buf = buffer;
+		undo_buffer[buffer_idx].blk = blk;
+		undo_buffer[buffer_idx].logno = rnode.relNode;
+		undo_buffer[buffer_idx].zero = rbm == RBM_ZERO;
+		buffer_idx++;
+	}
+
+	return i;
+}
+
+/*
+ * Calculate total size required by nrecords and allocate them in bulk. This is
+ * required for some operation which can allocate multiple undo record in one
+ * WAL operation e.g multi-insert.  If we don't allocate undo space for all the
+ * record (which are inserted under one WAL) together than there is possibility
+ * that both of them go under different undo log.  And, currently during
+ * recovery we don't have mechanism to map xid to multiple log number during one
+ * WAL operation.  So in short all the operation under one WAL must allocate
+ * their undo from the same undo log.
+ */
+static UndoRecPtr
+UndoRecordAllocateMulti(UnpackedUndoRecord *undorecords, int nrecords,
+						UndoPersistence upersistence, TransactionId txid)
+{
+	UnpackedUndoRecord *urec;
+	UndoLogControl *log;
+	UndoRecordSize	size;
+	UndoRecPtr		urecptr;
+	bool	need_start_undo = false;
+	bool	first_rec_in_recovery;
+	bool	log_switched = false;
+	int	i;
+
+	/* There must be at least one undo record. */
+	if (nrecords <= 0)
+		elog(ERROR, "cannot allocate space for zero undo records");
+
+	/*
+	 * If this is the first undo record for this transaction then set the
+	 * uur_next to the SpecialUndoRecPtr.  This is the indication to allocate
+	 * the space for the transaction header and the valid value of the uur_next
+	 * will be updated while preparing the first undo record of the next
+	 * transaction.
+	 */
+	first_rec_in_recovery = InRecovery && IsTransactionFirstRec(txid);
+
+	if ((!InRecovery && prev_txid[upersistence] != txid) ||
+		first_rec_in_recovery)
+	{
+		need_start_undo = true;
+	}
+
+resize:
+	size = 0;
+
+	for (i = 0; i < nrecords; i++)
+	{
+		urec = undorecords + i;
+
+		if (need_start_undo && i == 0)
+		{
+			urec->uur_next = SpecialUndoRecPtr;
+			urec->uur_xidepoch = GetEpochForXid(txid);
+			urec->uur_progress = 0;
+
+			/* During recovery, Fetch database id from the undo log state. */
+			if (InRecovery)
+				urec->uur_dbid = UndoLogStateGetDatabaseId();
+			else
+				urec->uur_dbid = MyDatabaseId;
+		}
+		else
+		{
+			/*
+			 * It is okay to initialize these variables as these are used only
+			 * with the first record of transaction.
+			 */
+			urec->uur_next = InvalidUndoRecPtr;
+			urec->uur_xidepoch = 0;
+			urec->uur_dbid = 0;
+			urec->uur_progress = 0;
+		}
+
+
+		/* calculate the size of the undo record. */
+		size += UndoRecordExpectedSize(urec);
+	}
+
+	if (InRecovery)
+		urecptr = UndoLogAllocateInRecovery(txid, size, upersistence);
+	else
+		urecptr = UndoLogAllocate(size, upersistence);
+
+	log = UndoLogGet(UndoRecPtrGetLogNo(urecptr), false);
+	Assert(AmAttachedToUndoLog(log) || InRecovery);
+
+	/*
+	 * If this is the first record of the log and not the first record of
+	 * the transaction i.e. same transaction continued from the previous log
+	 */
+	if ((UndoRecPtrGetOffset(urecptr) == UndoLogBlockHeaderSize) &&
+		log->meta.prevlogno != InvalidUndoLogNumber)
+		log_switched = true;
+
+	/*
+	 * If we've rewound all the way back to the start of the transaction by
+	 * rolling back the first subtransaction (which we can't detect until
+	 * after we've allocated some space), we'll need a new transaction header.
+	 * If we weren't already generating one, that will make the record larger,
+	 * so we'll have to go back and recompute the size.
+	 */
+	if (!need_start_undo &&
+		(log->meta.insert == log->meta.last_xact_start ||
+		 UndoRecPtrGetOffset(urecptr) == UndoLogBlockHeaderSize))
+	{
+		need_start_undo = true;
+		urec->uur_info = 0;		/* force recomputation of info bits */
+
+		goto resize;
+	}
+
+	/*
+	 * If transaction id is switched then update the previous transaction's
+	 * start undo record.
+	 */
+	if (first_rec_in_recovery ||
+		(!InRecovery && prev_txid[upersistence] != txid) ||
+		log_switched)
+	{
+		/* Don't update our own start header. */
+		if (log->meta.last_xact_start != log->meta.insert)
+			UndoRecordPrepareTransInfo(urecptr, log_switched);
+
+		/* Remember the current transaction's xid. */
+		prev_txid[upersistence] = txid;
+
+		/* Store the current transaction's start undorecptr in the undo log. */
+		UndoLogSetLastXactStartPoint(urecptr);
+		update_prev_header = false;
+	}
+
+	UndoLogAdvance(urecptr, size, upersistence);
+
+	return urecptr;
+}
+
+/*
+ * Call UndoSetPrepareSize to set the value of how many maximum prepared can
+ * be done before inserting the prepared undo.  If size is > MAX_PREPARED_UNDO
+ * then it will allocate extra memory to hold the extra prepared undo.
+ */
+void
+UndoSetPrepareSize(int max_prepare, UnpackedUndoRecord *undorecords,
+				   TransactionId xid, UndoPersistence upersistence)
+{
+	TransactionId txid;
+
+	/* Get the top transaction id. */
+	if (xid == InvalidTransactionId)
+	{
+		Assert (!InRecovery);
+		txid = GetTopTransactionId();
+	}
+	else
+	{
+		Assert (InRecovery || (xid == GetTopTransactionId()));
+		txid = xid;
+	}
+
+	multi_prep_urp = UndoRecordAllocateMulti(undorecords, max_prepare,
+											 upersistence, txid);
+	if (max_prepare <= MAX_PREPARED_UNDO)
+		return;
+
+	prepared_undo = palloc0(max_prepare * sizeof(PreparedUndoSpace));
+
+	/*
+	 * Consider buffers needed for updating previous transaction's
+	 * starting undo record. Hence increased by 1.
+	 */
+	undo_buffer = palloc0((max_prepare + 1) * MAX_BUFFER_PER_UNDO *
+						 sizeof(UndoBuffers));
+	max_prepare_undo = max_prepare;
+}
+
+/*
+ * Call PrepareUndoInsert to tell the undo subsystem about the undo record you
+ * intended to insert.  Upon return, the necessary undo buffers are pinned and
+ * locked.
+ * This should be done before any critical section is established, since it
+ * can fail.
+ *
+ * If not in recovery, 'xid' should refer to the top transaction id because
+ * undo log only stores mapping for the top most transactions.
+ * If in recovery, 'xid' refers to the transaction id stored in WAL.
+ */
+UndoRecPtr
+PrepareUndoInsert(UnpackedUndoRecord *urec, UndoPersistence upersistence,
+				  TransactionId xid)
+{
+	UndoRecordSize	size;
+	UndoRecPtr		urecptr;
+	RelFileNode		rnode;
+	UndoRecordSize  cur_size = 0;
+	BlockNumber		cur_blk;
+	TransactionId	txid;
+	int				starting_byte;
+	int				index = 0;
+	int				bufidx;
+	ReadBufferMode	rbm;
+
+	/* Already reached maximum prepared limit. */
+	if (prepare_idx == max_prepare_undo)
+		return InvalidUndoRecPtr;
+
+	/*
+	 * If this is the first undo record for this top transaction add the
+	 * transaction information to the undo record.
+	 *
+	 * XXX there is also an option that instead of adding the information to
+	 * this record we can prepare a new record which only contain transaction
+	 * informations.
+	 */
+	if (xid == InvalidTransactionId)
+	{
+		/* we expect during recovery, we always have a valid transaction id. */
+		Assert (!InRecovery);
+		txid = GetTopTransactionId();
+	}
+	else
+	{
+		/*
+		 * Assign the top transaction id because undo log only stores mapping for
+		 * the top most transactions.
+		 */
+		Assert (InRecovery || (xid == GetTopTransactionId()));
+		txid = xid;
+	}
+
+	if (!UndoRecPtrIsValid(multi_prep_urp))
+		urecptr = UndoRecordAllocateMulti(urec, 1, upersistence, txid);
+	else
+		urecptr = multi_prep_urp;
+
+	size = UndoRecordExpectedSize(urec);
+	if (UndoRecPtrIsValid(multi_prep_urp))
+	{
+		UndoLogOffset insert = UndoRecPtrGetOffset(multi_prep_urp);
+		insert = UndoLogOffsetPlusUsableBytes(insert, size);
+		multi_prep_urp = MakeUndoRecPtr(UndoRecPtrGetLogNo(urecptr), insert);
+	}
+
+	cur_blk = UndoRecPtrGetBlockNum(urecptr);
+	UndoRecPtrAssignRelFileNode(rnode, urecptr);
+	starting_byte = UndoRecPtrGetPageOffset(urecptr);
+
+	/*
+	 * If we happen to be writing the very first byte into this page, then
+	 * there is no need to read from disk.
+	 */
+	if (starting_byte == UndoLogBlockHeaderSize)
+		rbm = RBM_ZERO;
+	else
+		rbm = RBM_NORMAL;
+
+	do
+	{
+		bufidx = InsertFindBufferSlot(rnode, cur_blk, rbm, upersistence);
+		if (cur_size == 0)
+			cur_size = BLCKSZ - starting_byte;
+		else
+			cur_size += BLCKSZ - UndoLogBlockHeaderSize;
+
+		/* FIXME: Should we just report error ? */
+		Assert(index < MAX_BUFFER_PER_UNDO);
+
+		/* Keep the track of the buffers we have pinned. */
+		prepared_undo[prepare_idx].undo_buffer_idx[index++] = bufidx;
+
+		/* Undo record can not fit into this block so go to the next block. */
+		cur_blk++;
+
+		/*
+		 * If we need more pages they'll be all new so we can definitely skip
+		 * reading from disk.
+		 */
+		rbm = RBM_ZERO;
+	} while (cur_size < size);
+
+	/*
+	 * Save referenced of undo record pointer as well as undo record.
+	 * InsertPreparedUndo will use these to insert the prepared record.
+	 */
+	prepared_undo[prepare_idx].urec = urec;
+	prepared_undo[prepare_idx].urp = urecptr;
+	prepare_idx++;
+
+	return urecptr;
+}
+
+void
+RegisterUndoLogBuffers(uint8 first_block_id)
+{
+	int		idx;
+	int		flags;
+
+	for (idx = 0; idx < buffer_idx; idx++)
+	{
+		flags = undo_buffer[idx].zero ? REGBUF_WILL_INIT : 0;
+		XLogRegisterBuffer(first_block_id + idx, undo_buffer[idx].buf, flags);
+	}
+}
+
+void
+UndoLogBuffersSetLSN(XLogRecPtr recptr)
+{
+	int		idx;
+
+	for (idx = 0; idx < buffer_idx; idx++)
+		PageSetLSN(BufferGetPage(undo_buffer[idx].buf), recptr);
+}
+
+/*
+ * Insert a previously-prepared undo record.  This will write the actual undo
+ * record into the buffers already pinned and locked in PreparedUndoInsert,
+ * and mark them dirty.  For persistent undo, this step should be performed
+ * after entering a critical section; it should never fail.
+ */
+void
+InsertPreparedUndo(void)
+{
+	Page	page;
+	int		starting_byte;
+	int		already_written;
+	int		bufidx = 0;
+	int		idx;
+	uint16	undo_len = 0;
+	UndoRecPtr	urp;
+	UnpackedUndoRecord	*uur;
+	UndoLogOffset offset;
+	UndoLogControl *log;
+	uint16	prev_undolen;
+
+	Assert(prepare_idx > 0);
+
+	/* This must be called under a critical section. */
+	Assert(CritSectionCount > 0);
+
+	for (idx = 0; idx < prepare_idx; idx++)
+	{
+		uur = prepared_undo[idx].urec;
+		urp = prepared_undo[idx].urp;
+
+		already_written = 0;
+		bufidx = 0;
+		starting_byte = UndoRecPtrGetPageOffset(urp);
+		offset = UndoRecPtrGetOffset(urp);
+
+		/*
+		 * We can read meta.prevlen without locking, because only we can write
+		 * to it.
+		 */
+		log = UndoLogGet(UndoRecPtrGetLogNo(urp), false);
+		Assert(AmAttachedToUndoLog(log) || InRecovery);
+		prev_undolen = log->meta.prevlen;
+
+		/* store the previous undo record length in the header */
+		uur->uur_prevlen = prev_undolen;
+
+		/* if starting a new log then there is no prevlen to store */
+		if (offset == UndoLogBlockHeaderSize)
+		{
+			if (log->meta.prevlogno != InvalidUndoLogNumber)
+			{
+				UndoLogControl *prevlog = UndoLogGet(log->meta.prevlogno, false);
+				uur->uur_prevlen = prevlog->meta.prevlen;
+			}
+			else
+				uur->uur_prevlen = 0;
+		}
+
+		/* if starting from a new page then include header in prevlen */
+		else if (starting_byte == UndoLogBlockHeaderSize)
+				uur->uur_prevlen += UndoLogBlockHeaderSize;
+
+		undo_len = 0;
+
+		do
+		{
+			PreparedUndoSpace undospace = prepared_undo[idx];
+			Buffer  buffer;
+
+			buffer = undo_buffer[undospace.undo_buffer_idx[bufidx]].buf;
+			page = BufferGetPage(buffer);
+
+			/*
+			 * Initialize the page whenever we try to write the first record
+			 * in page.
+			 */
+			if (starting_byte == UndoLogBlockHeaderSize)
+				PageInit(page, BLCKSZ, 0);
+
+			/*
+			 * Try to insert the record into the current page. If it doesn't
+			 * succeed then recall the routine with the next page.
+			 */
+			if (InsertUndoRecord(uur, page, starting_byte, &already_written, false))
+			{
+				undo_len += already_written;
+				MarkBufferDirty(buffer);
+				break;
+			}
+
+			MarkBufferDirty(buffer);
+			starting_byte = UndoLogBlockHeaderSize;
+			bufidx++;
+
+			/*
+			 * If we are swithing to the next block then consider the header
+			 * in total undo length.
+			 */
+			undo_len += UndoLogBlockHeaderSize;
+
+			Assert(bufidx < MAX_BUFFER_PER_UNDO);
+		} while(true);
+
+		prev_undolen = undo_len;
+
+		UndoLogSetPrevLen(UndoRecPtrGetLogNo(urp), prev_undolen);
+
+		if (UndoRecPtrIsValid(prev_txn_info.prev_urecptr))
+			UndoRecordUpdateTransInfo();
+
+		/*
+		 * Set the current undo location for a transaction.  This is required
+		 * to perform rollback during abort of transaction.
+		 */
+		SetCurrentUndoLocation(urp);
+	}
+}
+
+/*
+ * Unlock and release undo buffers.  This step performed after exiting any
+ * critical section.
+ */
+void
+UnlockReleaseUndoBuffers(void)
+{
+	int	i;
+	for (i = 0; i < buffer_idx; i++)
+	{
+		UnlockReleaseBuffer(undo_buffer[i].buf);
+		undo_buffer[i].blk = InvalidBlockNumber;
+		undo_buffer[i].buf = InvalidBuffer;
+	}
+
+	prev_txn_info.prev_urecptr = InvalidUndoRecPtr;
+
+	/* Reset the prepared index. */
+	prepare_idx = 0;
+	buffer_idx = 0;
+	multi_prep_urp = InvalidUndoRecPtr;
+
+	/*
+	 * max_prepare_undo limit is changed so free the allocated memory and reset
+	 * all the variable back to its default value.
+	 */
+	if (max_prepare_undo > MAX_PREPARED_UNDO)
+	{
+		pfree(undo_buffer);
+		pfree(prepared_undo);
+		undo_buffer = def_buffers;
+		prepared_undo = def_prepared;
+		max_prepare_undo = MAX_PREPARED_UNDO;
+	}
+}
+
+/*
+ * Helper function for UndoFetchRecord.  It will fetch the undo record pointed
+ * by urp and unpack the record into urec.  This function will not release the
+ * pin on the buffer if complete record is fetched from one buffer,  now caller
+ * can reuse the same urec to fetch the another undo record which is on the
+ * same block.  Caller will be responsible to release the buffer inside urec
+ * and set it to invalid if he wishes to fetch the record from another block.
+ */
+static UnpackedUndoRecord*
+UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode,
+				 UndoPersistence persistence)
+{
+	Buffer			 buffer = urec->uur_buffer;
+	Page			 page;
+	int				 starting_byte = UndoRecPtrGetPageOffset(urp);
+	int				 already_decoded = 0;
+	BlockNumber		 cur_blk;
+	bool			 is_undo_splited = false;
+
+	cur_blk = UndoRecPtrGetBlockNum(urp);
+
+	/* If we already have a previous buffer then no need to allocate new. */
+	if (!BufferIsValid(buffer))
+	{
+		buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum, cur_blk,
+										   RBM_NORMAL, NULL,
+										   RelPersistenceForUndoPersistence(persistence));
+
+		urec->uur_buffer = buffer;
+	}
+
+	while (true)
+	{
+		LockBuffer(buffer, BUFFER_LOCK_SHARE);
+		page = BufferGetPage(buffer);
+
+		/*
+		 * FIXME: This can be optimized to just fetch header first and only
+		 * if matches with block number and offset then fetch the complete
+		 * record.
+		 */
+		if (UnpackUndoRecord(urec, page, starting_byte, &already_decoded, false))
+			break;
+
+		starting_byte = UndoLogBlockHeaderSize;
+		is_undo_splited = true;
+
+		/*
+		 * Complete record is not fitting into one buffer so release the buffer
+		 * pin and also set invalid buffer in the undo record.
+		 */
+		urec->uur_buffer = InvalidBuffer;
+		UnlockReleaseBuffer(buffer);
+
+		/* Go to next block. */
+		cur_blk++;
+		buffer = ReadBufferWithoutRelcache(rnode, UndoLogForkNum, cur_blk,
+										   RBM_NORMAL, NULL,
+										   RelPersistenceForUndoPersistence(persistence));
+	}
+
+	/*
+	 * If we have copied the data then release the buffer. Otherwise just
+	 * unlock it.
+	 */
+	if (is_undo_splited)
+		UnlockReleaseBuffer(buffer);
+	else
+		LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
+
+	return urec;
+}
+
+/*
+ * Fetch the next undo record for given blkno, offset and transaction id (if
+ * valid).  We need to match transaction id along with block number and offset
+ * because in some cases (like reuse of slot for committed transaction), we
+ * need to skip the record if it is modified by a transaction later than the
+ * transaction indicated by previous undo record.  For example, consider a
+ * case where tuple (ctid - 0,1) is modified by transaction id 500 which
+ * belongs to transaction slot 0. Then, the same tuple is modified by
+ * transaction id 501 which belongs to transaction slot 1.  Then, both the
+ * transaction slots are marked for reuse. Then, again the same tuple is
+ * modified by transaction id 502 which has used slot 0.  Now, some
+ * transaction which has started before transaction 500 wants to traverse the
+ * chain to find visible tuple will keep on rotating infinitely between undo
+ * tuple written by 502 and 501.  In such a case, we need to skip the undo
+ * tuple written by transaction 502 when we want to find the undo record
+ * indicated by the previous pointer of undo tuple written by transaction 501.
+ * Start the search from urp.  Caller need to call UndoRecordRelease to release the
+ * resources allocated by this function.
+ *
+ * urec_ptr_out is undo record pointer of the qualified undo record if valid
+ * pointer is passed.
+ */
+UnpackedUndoRecord*
+UndoFetchRecord(UndoRecPtr urp, BlockNumber blkno, OffsetNumber offset,
+				TransactionId xid, UndoRecPtr *urec_ptr_out,
+				SatisfyUndoRecordCallback callback)
+{
+	RelFileNode		 rnode, prevrnode = {0};
+	UnpackedUndoRecord *urec = NULL;
+	int	logno;
+
+	if (urec_ptr_out)
+		*urec_ptr_out = InvalidUndoRecPtr;
+
+	urec = palloc0(sizeof(UnpackedUndoRecord));
+
+	/* Find the undo record pointer we are interested in. */
+	while (true)
+	{
+		UndoLogControl *log;
+
+		UndoRecPtrAssignRelFileNode(rnode, urp);
+
+		/*
+		 * If we have a valid buffer pinned then just ensure that we want to
+		 * find the next tuple from the same block.  Otherwise release the
+		 * buffer and set it invalid
+		 */
+		if (BufferIsValid(urec->uur_buffer))
+		{
+			/*
+			 * Undo buffer will be changed if the next undo record belongs to a
+			 * different block or undo log.
+			 */
+			if (UndoRecPtrGetBlockNum(urp) != BufferGetBlockNumber(urec->uur_buffer) ||
+				(prevrnode.relNode != rnode.relNode))
+			{
+				ReleaseBuffer(urec->uur_buffer);
+				urec->uur_buffer = InvalidBuffer;
+			}
+		}
+		else
+		{
+			/*
+			 * If there is not a valid buffer in urec->uur_buffer that means we
+			 * had copied the payload data and tuple data so free them.
+			 */
+			if (urec->uur_payload.data)
+				pfree(urec->uur_payload.data);
+			if (urec->uur_tuple.data)
+				pfree(urec->uur_tuple.data);
+		}
+
+		/* Reset the urec before fetching the tuple */
+		urec->uur_tuple.data = NULL;
+		urec->uur_tuple.len = 0;
+		urec->uur_payload.data = NULL;
+		urec->uur_payload.len = 0;
+		prevrnode = rnode;
+
+		logno = UndoRecPtrGetLogNo(urp);
+		log = UndoLogGet(logno, false);
+		if (log == NULL)
+		{
+			if (BufferIsValid(urec->uur_buffer))
+				ReleaseBuffer(urec->uur_buffer);
+			return NULL;
+		}
+
+		/*
+		 * Prevent UndoDiscardOneLog() from discarding data while we try to
+		 * read it.  Usually we would acquire log->mutex to read log->meta
+		 * members, but in this case we know that discard can't move without
+		 * also holding log->discard_lock.
+		 */
+		LWLockAcquire(&log->discard_lock, LW_SHARED);
+		if (!UndoRecPtrIsValid(log->oldest_data))
+		{
+			/*
+			 * UndoDiscardInfo is not yet initialized. Hence, we've to check
+			 * UndoLogIsDiscarded and if it's already discarded then we have
+			 * nothing to do.
+			 */
+			LWLockRelease(&log->discard_lock);
+			if (UndoLogIsDiscarded(urp))
+			{
+				if (BufferIsValid(urec->uur_buffer))
+					ReleaseBuffer(urec->uur_buffer);
+				return NULL;
+			}
+
+			LWLockAcquire(&log->discard_lock, LW_SHARED);
+		}
+
+		/* Check if it's already discarded. */
+		if (urp < log->oldest_data)
+		{
+			LWLockRelease(&log->discard_lock);
+			if (BufferIsValid(urec->uur_buffer))
+				ReleaseBuffer(urec->uur_buffer);
+			return NULL;
+		}
+
+		/* Fetch the current undo record. */
+		urec = UndoGetOneRecord(urec, urp, rnode, log->meta.persistence);
+		LWLockRelease(&log->discard_lock);
+
+		if (blkno == InvalidBlockNumber)
+			break;
+
+		/* Check whether the undorecord satisfies conditions */
+		if (callback(urec, blkno, offset, xid))
+			break;
+
+		urp = urec->uur_blkprev;
+	}
+
+	if (urec_ptr_out)
+		*urec_ptr_out = urp;
+	return urec;
+}
+
+/*
+ * Return the previous undo record pointer.
+ *
+ * This API can switch to the previous log if the current log is exhausted,
+ * so the caller shouldn't use it where that is not expected.
+ */
+UndoRecPtr
+UndoGetPrevUndoRecptr(UndoRecPtr urp, uint16 prevlen)
+{
+	UndoLogNumber logno = UndoRecPtrGetLogNo(urp);
+	UndoLogOffset offset = UndoRecPtrGetOffset(urp);
+
+	/*
+	 * We have reached to the first undo record of this undo log, so fetch the
+	 * previous undo record of the transaction from the previous log.
+	 */
+	if (offset == UndoLogBlockHeaderSize)
+	{
+		UndoLogControl	*prevlog, *log;
+
+		log = UndoLogGet(logno, false);
+
+		Assert(log->meta.prevlogno != InvalidUndoLogNumber);
+
+		/* Fetch the previous log control. */
+		prevlog = UndoLogGet(log->meta.prevlogno, true);
+		logno = log->meta.prevlogno;
+		offset = prevlog->meta.insert;
+	}
+
+	/* calculate the previous undo record pointer */
+	return MakeUndoRecPtr (logno, offset - prevlen);
+}
+
+/*
+ * Release the resources allocated by UndoFetchRecord.
+ */
+void
+UndoRecordRelease(UnpackedUndoRecord *urec)
+{
+	/*
+	 * If the undo record has a valid buffer then just release the buffer
+	 * otherwise free the tuple and payload data.
+	 */
+	if (BufferIsValid(urec->uur_buffer))
+	{
+		ReleaseBuffer(urec->uur_buffer);
+	}
+	else
+	{
+		if (urec->uur_payload.data)
+			pfree(urec->uur_payload.data);
+		if (urec->uur_tuple.data)
+			pfree(urec->uur_tuple.data);
+	}
+
+	pfree (urec);
+}
+
+/*
+ * Called whenever we attach to a new undo log, so that we forget about our
+ * translation-unit private state relating to the log we were last attached
+ * to.
+ */
+void
+UndoRecordOnUndoLogChange(UndoPersistence persistence)
+{
+	prev_txid[persistence] = InvalidTransactionId;
+}
diff --git a/src/backend/access/undo/undorecord.c b/src/backend/access/undo/undorecord.c
new file mode 100644
index 0000000..33bb153
--- /dev/null
+++ b/src/backend/access/undo/undorecord.c
@@ -0,0 +1,459 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.c
+ *	  encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undorecord.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/undorecord.h"
+#include "catalog/pg_tablespace.h"
+#include "storage/block.h"
+
+/* Workspace for InsertUndoRecord and UnpackUndoRecord. */
+static UndoRecordHeader work_hdr;
+static UndoRecordRelationDetails work_rd;
+static UndoRecordBlock work_blk;
+static UndoRecordTransaction work_txn;
+static UndoRecordPayload work_payload;
+
+/* Prototypes for static functions. */
+static void UndoRecordSetInfo(UnpackedUndoRecord *uur);
+static bool InsertUndoBytes(char *sourceptr, int sourcelen,
+				char **writeptr, char *endptr,
+				int *my_bytes_written, int *total_bytes_written);
+static bool ReadUndoBytes(char *destptr, int readlen,
+			  char **readptr, char *endptr,
+			  int *my_bytes_read, int *total_bytes_read, bool nocopy);
+
+/*
+ * Compute and return the expected size of an undo record.
+ */
+Size
+UndoRecordExpectedSize(UnpackedUndoRecord *uur)
+{
+	Size	size;
+
+	/* Fixme : Temporary hack to allow zheap to set some value for uur_info. */
+	/* if (uur->uur_info == 0) */
+		UndoRecordSetInfo(uur);
+
+	size = SizeOfUndoRecordHeader;
+	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0)
+		size += SizeOfUndoRecordRelationDetails;
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+		size += SizeOfUndoRecordBlock;
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+		size += SizeOfUndoRecordTransaction;
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		size += SizeOfUndoRecordPayload;
+		size += uur->uur_payload.len;
+		size += uur->uur_tuple.len;
+	}
+
+	return size;
+}
+
+/*
+ * Insert as much of an undo record as will fit in the given page.
+ * starting_byte is the byte within the give page at which to begin
+ * writing, while *already_written is the number of bytes written to
+ * previous pages.  Returns true if the remainder of the record was
+ * written and false if more bytes remain to be written; in either
+ * case, *already_written is set to the number of bytes written thus
+ * far.
+ *
+ * This function assumes that if *already_written is non-zero on entry,
+ * the same UnpackedUndoRecord is passed each time.  It also assumes
+ * that UnpackUndoRecord is not called between successive calls to
+ * InsertUndoRecord for the same UnpackedUndoRecord.
+ */
+bool
+InsertUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_written, bool header_only)
+{
+	char   *writeptr = (char *) page + starting_byte;
+	char   *endptr = (char *) page + BLCKSZ;
+	int		my_bytes_written = *already_written;
+
+	if (uur->uur_info == 0)
+		UndoRecordSetInfo(uur);
+
+	/*
+	 * If this is the first call, copy the UnpackedUndoRecord into the
+	 * temporary variables of the types that will actually be stored in the
+	 * undo pages.  We just initialize everything here, on the assumption
+	 * that it's not worth adding branches to save a handful of assignments.
+	 */
+	if (*already_written == 0)
+	{
+		work_hdr.urec_type = uur->uur_type;
+		work_hdr.urec_info = uur->uur_info;
+		work_hdr.urec_prevlen = uur->uur_prevlen;
+		work_hdr.urec_relfilenode = uur->uur_relfilenode;
+		work_hdr.urec_prevxid = uur->uur_prevxid;
+		work_hdr.urec_xid = uur->uur_xid;
+		work_hdr.urec_cid = uur->uur_cid;
+		work_rd.urec_tsid = uur->uur_tsid;
+		work_rd.urec_fork = uur->uur_fork;
+		work_blk.urec_blkprev = uur->uur_blkprev;
+		work_blk.urec_block = uur->uur_block;
+		work_blk.urec_offset = uur->uur_offset;
+		work_txn.urec_next = uur->uur_next;
+		work_txn.urec_xidepoch = uur->uur_xidepoch;
+		work_txn.urec_progress = uur->uur_progress;
+		work_txn.urec_dbid = uur->uur_dbid;
+		work_payload.urec_payload_len = uur->uur_payload.len;
+		work_payload.urec_tuple_len = uur->uur_tuple.len;
+	}
+	else
+	{
+		/*
+		 * We should have been passed the same record descriptor as before,
+		 * or caller has messed up.
+		 */
+		Assert(work_hdr.urec_type == uur->uur_type);
+		Assert(work_hdr.urec_info == uur->uur_info);
+		Assert(work_hdr.urec_prevlen == uur->uur_prevlen);
+		Assert(work_hdr.urec_relfilenode == uur->uur_relfilenode);
+		Assert(work_hdr.urec_prevxid == uur->uur_prevxid);
+		Assert(work_hdr.urec_xid == uur->uur_xid);
+		Assert(work_hdr.urec_cid == uur->uur_cid);
+		Assert(work_rd.urec_tsid == uur->uur_tsid);
+		Assert(work_rd.urec_fork == uur->uur_fork);
+		Assert(work_blk.urec_blkprev == uur->uur_blkprev);
+		Assert(work_blk.urec_block == uur->uur_block);
+		Assert(work_blk.urec_offset == uur->uur_offset);
+		Assert(work_txn.urec_next == uur->uur_next);
+		Assert(work_txn.urec_progress == uur->uur_progress);
+		Assert(work_txn.urec_dbid == uur->uur_dbid);
+		Assert(work_payload.urec_payload_len == uur->uur_payload.len);
+		Assert(work_payload.urec_tuple_len == uur->uur_tuple.len);
+	}
+
+	/* Write header (if not already done). */
+	if (!InsertUndoBytes((char *) &work_hdr, SizeOfUndoRecordHeader,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Write relation details (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0 &&
+		!InsertUndoBytes((char *) &work_rd, SizeOfUndoRecordRelationDetails,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Write block information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0 &&
+		!InsertUndoBytes((char *) &work_blk, SizeOfUndoRecordBlock,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	/* Write transaction information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0 &&
+		!InsertUndoBytes((char *) &work_txn, SizeOfUndoRecordTransaction,
+						 &writeptr, endptr,
+						 &my_bytes_written, already_written))
+		return false;
+
+	if (header_only)
+		return true;
+
+	/* Write payload information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		/* Payload header. */
+		if (!InsertUndoBytes((char *) &work_payload, SizeOfUndoRecordPayload,
+							 &writeptr, endptr,
+							 &my_bytes_written, already_written))
+			return false;
+
+		/* Payload bytes. */
+		if (uur->uur_payload.len > 0 &&
+			!InsertUndoBytes(uur->uur_payload.data, uur->uur_payload.len,
+							 &writeptr, endptr,
+							 &my_bytes_written, already_written))
+			return false;
+
+		/* Tuple bytes. */
+		if (uur->uur_tuple.len > 0 &&
+			!InsertUndoBytes(uur->uur_tuple.data, uur->uur_tuple.len,
+							 &writeptr, endptr,
+							 &my_bytes_written, already_written))
+			return false;
+	}
+
+	/* Hooray! */
+	return true;
+}
+
+/*
+ * Write undo bytes from a particular source, but only to the extent that
+ * they weren't written previously and will fit.
+ *
+ * 'sourceptr' points to the source data, and 'sourcelen' is the length of
+ * that data in bytes.
+ *
+ * 'writeptr' points to the insertion point for these bytes, and is updated
+ * for whatever we write.  The insertion point must not pass 'endptr', which
+ * represents the end of the buffer into which we are writing.
+ *
+ * 'my_bytes_written' is a pointer to the count of previous-written bytes
+ * from this and following structures in this undo record; that is, any
+ * bytes that are part of previous structures in the record have already
+ * been subtracted out.  We must update it for the bytes we write.
+ *
+ * 'total_bytes_written' points to the count of all previously-written bytes,
+ * and must likewise be updated for the bytes we write.
+ *
+ * The return value is false if we ran out of space before writing all
+ * the bytes, and otherwise true.
+ */
+static bool
+InsertUndoBytes(char *sourceptr, int sourcelen,
+				char **writeptr, char *endptr,
+				int *my_bytes_written, int *total_bytes_written)
+{
+	int		can_write;
+	int		remaining;
+
+	/*
+	 * If we've previously written all of these bytes, there's nothing
+	 * to do except update *my_bytes_written, which we must do to ensure
+	 * that the next call to this function gets the right starting value.
+	 */
+	if (*my_bytes_written >= sourcelen)
+	{
+		*my_bytes_written -= sourcelen;
+		return true;
+	}
+
+	/* Compute number of bytes we can write. */
+	remaining = sourcelen - *my_bytes_written;
+	can_write = Min(remaining, endptr - *writeptr);
+
+	/* Bail out if no bytes can be written. */
+	if (can_write == 0)
+		return false;
+
+	/* Copy the bytes we can write. */
+	memcpy(*writeptr, sourceptr + *my_bytes_written, can_write);
+
+	/* Update bookkeeeping infrormation. */
+	*writeptr += can_write;
+	*total_bytes_written += can_write;
+	*my_bytes_written = 0;
+
+	/* Return true only if we wrote the whole thing. */
+	return (can_write == remaining);
+}
+
+/*
+ * Call UnpackUndoRecord() one or more times to unpack an undo record.  For
+ * the first call, starting_byte should be set to the beginning of the undo
+ * record within the specified page, and *already_decoded should be set to 0;
+ * the function will update it based on the number of bytes decoded.  The
+ * return value is true if the entire record was unpacked and false if the
+ * record continues on the next page.  In the latter case, the function
+ * should be called again with the next page, passing starting_byte as the
+ * sizeof(PageHeaderData).
+ */
+bool UnpackUndoRecord(UnpackedUndoRecord *uur, Page page, int starting_byte,
+					  int *already_decoded, bool header_only)
+{
+	char	*readptr = (char *)page + starting_byte;
+	char	*endptr = (char *) page + BLCKSZ;
+	int		my_bytes_decoded = *already_decoded;
+	bool	is_undo_splited = (my_bytes_decoded > 0) ? true : false;
+
+	/* Decode header (if not already done). */
+	if (!ReadUndoBytes((char *) &work_hdr, SizeOfUndoRecordHeader,
+					   &readptr, endptr,
+					   &my_bytes_decoded, already_decoded, false))
+		return false;
+
+	uur->uur_type = work_hdr.urec_type;
+	uur->uur_info = work_hdr.urec_info;
+	uur->uur_prevlen = work_hdr.urec_prevlen;
+	uur->uur_relfilenode = work_hdr.urec_relfilenode;
+	uur->uur_prevxid = work_hdr.urec_prevxid;
+	uur->uur_xid = work_hdr.urec_xid;
+	uur->uur_cid = work_hdr.urec_cid;
+
+	if ((uur->uur_info & UREC_INFO_RELATION_DETAILS) != 0)
+	{
+		/* Decode header (if not already done). */
+		if (!ReadUndoBytes((char *) &work_rd, SizeOfUndoRecordRelationDetails,
+							&readptr, endptr,
+							&my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_tsid = work_rd.urec_tsid;
+		uur->uur_fork = work_rd.urec_fork;
+	}
+
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+	{
+		if (!ReadUndoBytes((char *) &work_blk, SizeOfUndoRecordBlock,
+							&readptr, endptr,
+							&my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_blkprev = work_blk.urec_blkprev;
+		uur->uur_block = work_blk.urec_block;
+		uur->uur_offset = work_blk.urec_offset;
+	}
+
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+	{
+		if (!ReadUndoBytes((char *) &work_txn, SizeOfUndoRecordTransaction,
+							&readptr, endptr,
+							&my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_next = work_txn.urec_next;
+		uur->uur_xidepoch = work_txn.urec_xidepoch;
+		uur->uur_progress = work_txn.urec_progress;
+		uur->uur_dbid = work_txn.urec_dbid;
+	}
+
+	if (header_only)
+		return true;
+
+	/* Read payload information (if needed and not already done). */
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		if (!ReadUndoBytes((char *) &work_payload, SizeOfUndoRecordPayload,
+							&readptr, endptr,
+							&my_bytes_decoded, already_decoded, false))
+			return false;
+
+		uur->uur_payload.len = work_payload.urec_payload_len;
+		uur->uur_tuple.len = work_payload.urec_tuple_len;
+
+		/*
+		 * If we can read the complete record from a single page then just
+		 * point payload data and tuple data into the page otherwise allocate
+		 * the memory.
+		 *
+		 * XXX There is possibility of optimization that instead of always
+		 * allocating the memory whenever tuple is split we can check if any of
+		 * the payload or tuple data falling into the same page then don't
+		 * allocate the memory for that.
+		 */
+		if (!is_undo_splited &&
+			uur->uur_payload.len + uur->uur_tuple.len <= (endptr - readptr))
+		{
+			uur->uur_payload.data = readptr;
+			readptr += uur->uur_payload.len;
+
+			uur->uur_tuple.data = readptr;
+		}
+		else
+		{
+			if (uur->uur_payload.len > 0 && uur->uur_payload.data == NULL)
+				uur->uur_payload.data = (char *) palloc0(uur->uur_payload.len);
+
+			if (uur->uur_tuple.len > 0 && uur->uur_tuple.data == NULL)
+				uur->uur_tuple.data = (char *) palloc0(uur->uur_tuple.len);
+
+			if (!ReadUndoBytes((char *) uur->uur_payload.data,
+							   uur->uur_payload.len, &readptr, endptr,
+							   &my_bytes_decoded, already_decoded, false))
+				return false;
+
+			if (!ReadUndoBytes((char *) uur->uur_tuple.data,
+							   uur->uur_tuple.len, &readptr, endptr,
+							   &my_bytes_decoded, already_decoded, false))
+				return false;
+		}
+	}
+
+	return true;
+}
+
+/*
+ * Read undo bytes into a particular destination,
+ *
+ * 'destptr' points to the source data, and 'readlen' is the length of
+ * that data to be read in bytes.
+ *
+ * 'readptr' points to the read point for these bytes, and is updated
+ * for how much we read.  The read point must not pass 'endptr', which
+ * represents the end of the buffer from which we are reading.
+ *
+ * 'my_bytes_read' is a pointer to the count of previous-read bytes
+ * from this and following structures in this undo record; that is, any
+ * bytes that are part of previous structures in the record have already
+ * been subtracted out.  We must update it for the bytes we read.
+ *
+ * 'total_bytes_read' points to the count of all previously-read bytes,
+ * and must likewise be updated for the bytes we read.
+ *
+ * nocopy if this flag is set true then it will just skip the readlen
+ * size in undo but it will not copy into the buffer.
+ *
+ * The return value is false if we ran out of space before read all
+ * the bytes, and otherwise true.
+ */
+static bool
+ReadUndoBytes(char *destptr, int readlen, char **readptr, char *endptr,
+			  int *my_bytes_read, int *total_bytes_read, bool nocopy)
+{
+	int		can_read;
+	int		remaining;
+
+	if (*my_bytes_read >= readlen)
+	{
+		*my_bytes_read -= readlen;
+		return true;
+	}
+
+	/* Compute number of bytes we can read. */
+	remaining = readlen - *my_bytes_read;
+	can_read = Min(remaining, endptr - *readptr);
+
+	/* Bail out if no bytes can be read. */
+	if (can_read == 0)
+		return false;
+
+	/* Copy the bytes we can read. */
+	if (!nocopy)
+		memcpy(destptr + *my_bytes_read, *readptr, can_read);
+
+	/* Update bookkeeping information. */
+	*readptr += can_read;
+	*total_bytes_read += can_read;
+	*my_bytes_read = 0;
+
+	/* Return true only if we wrote the whole thing. */
+	return (can_read == remaining);
+}
+
+/*
+ * Set uur_info for an UnpackedUndoRecord appropriately based on which
+ * other fields are set.
+ */
+static void
+UndoRecordSetInfo(UnpackedUndoRecord *uur)
+{
+	if (uur->uur_tsid != DEFAULTTABLESPACE_OID ||
+		uur->uur_fork != MAIN_FORKNUM)
+		uur->uur_info |= UREC_INFO_RELATION_DETAILS;
+	if (uur->uur_block != InvalidBlockNumber)
+		uur->uur_info |= UREC_INFO_BLOCK;
+	if (uur->uur_next != InvalidUndoRecPtr)
+		uur->uur_info |= UREC_INFO_TRANSACTION;
+	if (uur->uur_payload.len || uur->uur_tuple.len)
+		uur->uur_info |= UREC_INFO_PAYLOAD;
+}
diff --git a/src/include/access/undoinsert.h b/src/include/access/undoinsert.h
new file mode 100644
index 0000000..a2bf7cc
--- /dev/null
+++ b/src/include/access/undoinsert.h
@@ -0,0 +1,106 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoinsert.h
+ *	  entry points for inserting undo records
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undoinsert.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOINSERT_H
+#define UNDOINSERT_H
+
+#include "access/undolog.h"
+#include "access/undorecord.h"
+#include "access/xlogdefs.h"
+#include "catalog/pg_class.h"
+
+/*
+ * Typedef for callback function for UndoFetchRecord.
+ *
+ * This checks whether an undorecord satisfies the given conditions.
+ */
+typedef bool (*SatisfyUndoRecordCallback) (UnpackedUndoRecord* urec,
+											BlockNumber blkno,
+											OffsetNumber offset,
+											TransactionId xid);
+
+/*
+ * Call PrepareUndoInsert to tell the undo subsystem about the undo record you
+ * intended to insert.  Upon return, the necessary undo buffers are pinned and
+ * locked.
+ * This should be done before any critical section is established, since it
+ * can fail.
+ *
+ * If not in recovery, 'xid' should refer to the top transaction id because
+ * undo log only stores mapping for the top most transactions.
+ * If in recovery, 'xid' refers to the transaction id stored in WAL.
+ */
+extern UndoRecPtr PrepareUndoInsert(UnpackedUndoRecord *, UndoPersistence,
+					TransactionId xid);
+
+/*
+ * Insert a previously-prepared undo record.  This will write the actual undo
+ * record into the buffers already pinned and locked in PreparedUndoInsert,
+ * and mark them dirty.  For persistent undo, this step should be performed
+ * after entering a critical section; it should never fail.
+ */
+extern void InsertPreparedUndo(void);
+
+extern void RegisterUndoLogBuffers(uint8 first_block_id);
+extern void UndoLogBuffersSetLSN(XLogRecPtr recptr);
+
+/*
+ * Unlock and release undo buffers.  This step performed after exiting any
+ * critical section.
+ */
+extern void UnlockReleaseUndoBuffers(void);
+
+/*
+ * Forget about any previously-prepared undo record.  Error recovery calls
+ * this, but it can also be used by other code that changes its mind about
+ * inserting undo after having prepared a record for insertion.
+ */
+extern void CancelPreparedUndo(void);
+
+/*
+ * Fetch the next undo record for given blkno and offset.  Start the search
+ * from urp.  Caller need to call UndoRecordRelease to release the resources
+ * allocated by this function.
+ */
+extern UnpackedUndoRecord* UndoFetchRecord(UndoRecPtr urp,
+										   BlockNumber blkno,
+										   OffsetNumber offset,
+										   TransactionId xid,
+										   UndoRecPtr *urec_ptr_out,
+										   SatisfyUndoRecordCallback callback);
+/*
+ * Release the resources allocated by UndoFetchRecord.
+ */
+extern void UndoRecordRelease(UnpackedUndoRecord *urec);
+
+/*
+ * Set the value of PrevUndoLen.
+ */
+extern void UndoRecordSetPrevUndoLen(uint16 len);
+
+/*
+ * Call UndoSetPrepareSize to set the value of how many maximum prepared can
+ * be done before inserting the prepared undo.  If size is > MAX_PREPARED_UNDO
+ * then it will allocate extra memory to hold the extra prepared undo.
+ */
+extern void UndoSetPrepareSize(int max_prepare, UnpackedUndoRecord *undorecords,
+							   TransactionId xid, UndoPersistence upersistence);
+
+/*
+ * return the previous undo record pointer.
+ */
+extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, uint16 prevlen);
+
+extern void UndoRecordOnUndoLogChange(UndoPersistence persistence);
+
+
+#endif   /* UNDOINSERT_H */
diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h
new file mode 100644
index 0000000..85642ad
--- /dev/null
+++ b/src/include/access/undorecord.h
@@ -0,0 +1,216 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.h
+ *	  encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undorecord.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDORECORD_H
+#define UNDORECORD_H
+
+#include "access/undolog.h"
+#include "lib/stringinfo.h"
+#include "storage/block.h"
+#include "storage/bufpage.h"
+#include "storage/buf.h"
+#include "storage/off.h"
+
+
+/*
+ * Every undo record begins with an UndoRecordHeader structure, which is
+ * followed by the additional structures indicated by the contents of
+ * urec_info.  All structures are packed into the alignment without padding
+ * bytes, and the undo record itself need not be aligned either, so care
+ * must be taken when reading the header.
+ */
+typedef struct UndoRecordHeader
+{
+	uint8		urec_type;		/* record type code */
+	uint8		urec_info;		/* flag bits */
+	uint16		urec_prevlen;	/* length of previous record in bytes */
+	Oid			urec_relfilenode;		/* relfilenode for relation */
+	/*
+	 * Transaction id that has modified the tuple present in this undo record.
+	 * If this is older then RecentGlobalXmin, then we can consider the tuple
+	 * in this undo record as visible.
+	 */
+	TransactionId urec_prevxid;
+	/*
+	 * Transaction id that has modified the tuple for which this undo record
+	 * is written.  We use this to skip the undo records.  See comments atop
+	 * function UndoFetchRecord.
+	 */
+	TransactionId urec_xid;			/* Transaction id */
+	CommandId	urec_cid;			/* command id */
+} UndoRecordHeader;
+
+#define SizeOfUndoRecordHeader	\
+	(offsetof(UndoRecordHeader, urec_cid) + sizeof(CommandId))
+
+/*
+ * If UREC_INFO_RELATION_DETAILS is set, an UndoRecordRelationDetails structure
+ * follows.
+ *
+ * If UREC_INFO_BLOCK is set, an UndoRecordBlock structure follows.
+ *
+ * If UREC_INFO_PAYLOAD is set, an UndoRecordPayload structure follows.
+ *
+ * When (as will often be the case) multiple structures are present, they
+ * appear in the same order in which the constants are defined here.  That is,
+ * UndoRecordRelationDetails appears first.
+ */
+#define UREC_INFO_RELATION_DETAILS			0x01
+#define UREC_INFO_BLOCK						0x02
+#define UREC_INFO_PAYLOAD					0x04
+#define UREC_INFO_TRANSACTION				0x08
+#define UREC_INFO_PAYLOAD_CONTAINS_SLOT		0x10
+/*
+ * Additional information about a relation to which this record pertains,
+ * namely the tablespace OID and fork number.  If the tablespace OID is
+ * DEFAULTTABLESPACE_OID and the fork number is MAIN_FORKNUM, this structure
+ * can (and should) be omitted.
+ */
+typedef struct UndoRecordRelationDetails
+{
+	Oid			urec_tsid;		/* tablespace OID */
+	ForkNumber		urec_fork;		/* fork number */
+} UndoRecordRelationDetails;
+
+#define SizeOfUndoRecordRelationDetails \
+	(offsetof(UndoRecordRelationDetails, urec_fork) + sizeof(uint8))
+
+/*
+ * Identifying information for a block to which this record pertains, and
+ * a pointer to the previous record for the same block.
+ */
+typedef struct UndoRecordBlock
+{
+	uint64		urec_blkprev;	/* byte offset of previous undo for block */
+	BlockNumber urec_block;		/* block number */
+	OffsetNumber urec_offset;	/* offset number */
+} UndoRecordBlock;
+
+#define SizeOfUndoRecordBlock \
+	(offsetof(UndoRecordBlock, urec_offset) + sizeof(OffsetNumber))
+
+/*
+ * Identifying information for a transaction to which this undo belongs.
+ * it will also store the total size of the undo for this transaction.
+ */
+typedef struct UndoRecordTransaction
+{
+	uint32			urec_progress;  /* undo applying progress. */
+	uint32			urec_xidepoch;  /* epoch of the current transaction */
+	Oid				urec_dbid;		/* database id */
+	uint64			urec_next;		/* urec pointer of the next transaction */
+} UndoRecordTransaction;
+
+#define SizeOfUrecNext (sizeof(UndoRecPtr))
+#define SizeOfUndoRecordTransaction \
+	(offsetof(UndoRecordTransaction, urec_next) + SizeOfUrecNext)
+
+#define urec_next_pos \
+	(SizeOfUndoRecordTransaction - SizeOfUrecNext)
+
+/*
+ * Information about the amount of payload data and tuple data present
+ * in this record.  The payload bytes immediately follow the structures
+ * specified by flag bits in urec_info, and the tuple bytes follow the
+ * payload bytes.
+ */
+typedef struct UndoRecordPayload
+{
+	uint16		urec_payload_len;		/* # of payload bytes */
+	uint16		urec_tuple_len; /* # of tuple bytes */
+} UndoRecordPayload;
+
+#define SizeOfUndoRecordPayload \
+	(offsetof(UndoRecordPayload, urec_tuple_len) + sizeof(uint16))
+
+/*
+ * Information that can be used to create an undo record or that can be
+ * extracted from one previously created.  The raw undo record format is
+ * difficult to manage, so this structure provides a convenient intermediate
+ * form that is easier for callers to manage.
+ *
+ * When creating an undo record from an UnpackedUndoRecord, caller should
+ * set uur_info to 0.  It will be initialized by the first call to
+ * UndoRecordExpectedSize or InsertUndoRecord.
+ *
+ * When an undo record is decoded into an UnpackedUndoRecord, all fields
+ * will be initialized, but those for which no information is available
+ * will be set to invalid or default values, as appropriate.
+ */
+typedef struct UnpackedUndoRecord
+{
+	uint8		uur_type;		/* record type code */
+	uint8		uur_info;		/* flag bits */
+	uint16		uur_prevlen;	/* length of previous record */
+	Oid			uur_relfilenode;	/* relfilenode for relation */
+	TransactionId uur_prevxid;		/* transaction id */
+	TransactionId uur_xid;		/* transaction id */
+	CommandId	uur_cid;		/* command id */
+	Oid			uur_tsid;		/* tablespace OID */
+	ForkNumber	uur_fork;		/* fork number */
+	uint64		uur_blkprev;	/* byte offset of previous undo for block */
+	BlockNumber uur_block;		/* block number */
+	OffsetNumber uur_offset;	/* offset number */
+	Buffer		uur_buffer;		/* buffer in which undo record data points */
+	uint32		uur_xidepoch;	/* epoch of the inserting transaction. */
+	uint64		uur_next;		/* urec pointer of the next transaction */
+	Oid			uur_dbid;		/* database id*/
+
+	/*
+	 * undo action apply progress 0 = not started, 1 = completed. In future it
+	 * can also be used to show the progress of how much undo has been applied
+	 * so far with some formulae but currently only 0 and 1 is used.
+	 */
+	uint32         uur_progress;
+	StringInfoData uur_payload;	/* payload bytes */
+	StringInfoData uur_tuple;	/* tuple bytes */
+} UnpackedUndoRecord;
+
+/*
+ * Compute the number of bytes of storage that will be required to insert
+ * an undo record.  Sets uur->uur_info as a side effect.
+ */
+extern Size UndoRecordExpectedSize(UnpackedUndoRecord *uur);
+
+/*
+ * To insert an undo record, call InsertUndoRecord() repeatedly until it
+ * returns true.  For the first call, the given page should be the one which
+ * the caller has determined to contain the current insertion point,
+ * starting_byte should be the byte offset within that page which corresponds
+ * to the current insertion point, and *already_written should be 0.  The
+ * return value will be true if the entire record is successfully written
+ * into that page, and false if not.  In either case, *already_written will
+ * be updated to the number of bytes written by all InsertUndoRecord calls
+ * for this record to date.  If this function is called again to continue
+ * writing the record, the previous value for *already_written should be
+ * passed again, and starting_byte should be passed as sizeof(PageHeaderData)
+ * (since the record will continue immediately following the page header).
+ *
+ * This function sets uur->uur_info as a side effect.
+ */
+extern bool InsertUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_written, bool header_only);
+
+/*
+ * Call UnpackUndoRecord() one or more times to unpack an undo record.  For
+ * the first call, starting_byte should be set to the beginning of the undo
+ * record within the specified page, and *already_decoded should be set to 0;
+ * the function will update it based on the number of bytes decoded.  The
+ * return value is true if the entire record was unpacked and false if the
+ * record continues on the next page.  In the latter case, the function
+ * should be called again with the next page, passing starting_byte as the
+ * sizeof(PageHeaderData).
+ */
+extern bool UnpackUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_decoded, bool header_only);
+
+#endif   /* UNDORECORD_H */
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index 689c57c..73394c5 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -14,6 +14,7 @@
 #ifndef XACT_H
 #define XACT_H
 
+#include "access/undolog.h"
 #include "access/xlogreader.h"
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
@@ -430,5 +431,6 @@ extern void ParseAbortRecord(uint8 info, xl_xact_abort *xlrec, xl_xact_parsed_ab
 extern void EnterParallelMode(void);
 extern void ExitParallelMode(void);
 extern bool IsInParallelMode(void);
+extern void SetCurrentUndoLocation(UndoRecPtr urec_ptr);
 
 #endif							/* XACT_H */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index e01d12e..8cfcd44 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -277,6 +277,7 @@ extern XLogRecPtr GetInsertRecPtr(void);
 extern XLogRecPtr GetFlushRecPtr(void);
 extern XLogRecPtr GetLastImportantRecPtr(void);
 extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch);
+extern uint32 GetEpochForXid(TransactionId xid);
 extern void RemovePromoteSignalFiles(void);
 
 extern bool CheckPromoteSignal(void);
-- 
1.8.3.1

