From 869eab6d27291ea0740d905613f341ade3577bed Mon Sep 17 00:00:00 2001
From: Dilip Kumar <dilipkumar@localhost.localdomain>
Date: Thu, 2 May 2019 11:28:13 +0530
Subject: [PATCH] Provide interfaces to store and fetch undo records.

Add the capability to form undo records and store them in undo logs.  We
also provide the capability to fetch the undo records.  This layer will use
undo-log-storage to reserve the space for the undo records and buffer
management routines to write and read the undo records.

Undo records are stored in sequential order in the undo log.  Each undo
record consists of a variable length header, tuple data, and payload
information.  The undo records are stored without any sort of alignment
padding and a undo record can span across multiple pages.  The undo records
for a transaction can span across multiple undo logs.

Author: Dilip Kumar with contributions from Robert Haas, Amit Kapila,
        Thomas Munro and Rafia Sabih
Reviewed-by: Earlier version of this patch is reviewed by Amit Kapila
Tested-by: Neha Sharma
Discussion: https://www.postgresql.org/message-id/CAFiTN-uVxxopn0UZ64%3DF-sydbETBbGjWapnBikNo1%3DXv78UeFw%40mail.gmail.com
---
 src/backend/access/undo/Makefile             |    2 +-
 src/backend/access/undo/README.undointerface |   29 +
 src/backend/access/undo/undoaccess.c         | 1495 ++++++++++++++++++++++++++
 src/backend/access/undo/undorecord.c         |  744 +++++++++++++
 src/include/access/transam.h                 |    1 +
 src/include/access/undoaccess.h              |  115 ++
 src/include/access/undorecord.h              |  231 ++++
 7 files changed, 2616 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/access/undo/README.undointerface
 create mode 100644 src/backend/access/undo/undoaccess.c
 create mode 100644 src/backend/access/undo/undorecord.c
 create mode 100644 src/include/access/undoaccess.h
 create mode 100644 src/include/access/undorecord.h

diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile
index 219c696..f41e8f7 100644
--- a/src/backend/access/undo/Makefile
+++ b/src/backend/access/undo/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undolog.o
+OBJS = undoinsert.o undolog.o undorecord.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/undo/README.undointerface b/src/backend/access/undo/README.undointerface
new file mode 100644
index 0000000..41e2c0e
--- /dev/null
+++ b/src/backend/access/undo/README.undointerface
@@ -0,0 +1,29 @@
+Undo record interface layer
+---------------------------
+This is the next layer which sits on top of the undo log storage, which will
+provide an interface for prepare, insert, or fetch the undo records.  This
+layer will use undo-log-storage to reserve the space for the undo records
+and buffer management routine to write and read the undo records.
+
+Writing an undo record
+----------------------
+To prepare an undo record, first, it will allocate required space using
+undo log storage module.  Next, it will pin and lock the required buffers and
+return an undo record pointer where it will insert the record.  Finally, it
+calls the Insert routine for final insertion of prepared record.  Additionally,
+there is a mechanism for multi-insert, wherein multiple records are prepared
+and inserted at a time.
+
+Fetching and undo record
+------------------------
+To fetch an undo record, a caller must provide a valid undo record pointer.
+Optionally, the caller can provide a callback function with the information of
+the block and offset, which will help in faster retrieval of undo record,
+otherwise, it has to traverse the undo-chain.
+
+There is also an interface to bulk fetch the undo records.  Where the caller
+can provide a TO and FROM undo record pointer and the memory limit for storing
+the undo records.  This API will return all the undo record between FROM and TO
+undo record pointers if they can fit into provided memory limit otherwise, it
+return whatever can fit into the memory limit.  And, the caller can call it
+repeatedly until it fetches all the records.
diff --git a/src/backend/access/undo/undoaccess.c b/src/backend/access/undo/undoaccess.c
new file mode 100644
index 0000000..b3c9cf3
--- /dev/null
+++ b/src/backend/access/undo/undoaccess.c
@@ -0,0 +1,1495 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoaccess.c
+ *	  entry points for inserting/fetching undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undoaccess.c
+ *
+ * NOTES:
+ * Undo record layout:
+ *
+ * Undo records are stored in sequential order in the undo log.  Each undo
+ * record consists of a variable length header, tuple data, and payload
+ * information.  The first undo record of each transaction contains a
+ * transaction header that points to the next transaction's start header.
+ * This allows us to discard the entire transaction's log at one-shot rather
+ * than record-by-record.  The callers are not aware of transaction header,
+ * this is entirely maintained and used by undo record layer.   See
+ * undorecord.h for detailed information about undo record header.
+ *
+ * Multiple logs:
+ *
+ * It is possible that the undo records for a transaction spans multiple undo
+ * logs.  We need some special handling while inserting them to ensure that
+ * discard and rollbacks can work sanely.
+ *
+ * When the undo record for a transaction gets inserted in the next log then we
+ * add a transaction header for the first record of the transaction in the new
+ * log and connect this undo record to the first record of the transaction in
+ * the previous log by updating the "uur_next" field.
+ *
+ * We will also keep a previous undo record pointer to the last undo record of
+ * the transaction in the previous log, so that we can find the previous undo
+ * record pointer during rollback.
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/transam.h"
+#include "access/undorecord.h"
+#include "access/undoaccess.h"
+#include "access/undolog_xlog.h"
+#include "access/xact.h"
+#include "access/xlog.h"
+#include "access/xlogutils.h"
+#include "catalog/pg_tablespace.h"
+#include "commands/tablecmds.h"
+#include "storage/block.h"
+#include "storage/buf.h"
+#include "storage/buf_internals.h"
+#include "storage/bufmgr.h"
+#include "miscadmin.h"
+
+/* Prototypes for static functions. */
+static UnpackedUndoRecord *UndoGetOneRecord(UnpackedUndoRecord *urec,
+				 UndoRecPtr urp, RelFileNode rnode,
+				 UndoPersistence persistence,
+				 Buffer *prevbuf);
+static void UndoRecordPrepareTransInfo(UndoRecordInsertContext *context,
+						   UndoRecPtr urecptr,
+						   UndoRecPtr xact_urp);
+static void UndoRecordUpdateTransInfo(UndoRecordInsertContext *context,
+						  int idx);
+static int UndoGetBufferSlot(UndoRecordInsertContext *context,
+				  RelFileNode rnode, BlockNumber blk,
+				  ReadBufferMode rbm);
+static uint16 UndoGetPrevRecordLen(UndoRecPtr urp, Buffer input_buffer,
+					 UndoPersistence upersistence);
+
+/*
+ * Structure to hold the prepared undo information.
+ */
+struct PreparedUndoSpace
+{
+	UndoRecPtr	urp;			/* undo record pointer */
+	UnpackedUndoRecord *urec;	/* undo record */
+	uint16		size;			/* undo record size */
+	int			undo_buffer_idx[MAX_BUFFER_PER_UNDO];	/* undo_buffer array
+														 * index */
+};
+
+/*
+ * This holds undo buffers information required for PreparedUndoSpace during
+ * prepare undo time.  Basically, during prepare time which is called outside
+ * the critical section we will acquire all necessary undo buffers pin and lock.
+ * Later, during insert phase we will write actual records into thse buffers.
+ */
+struct PreparedUndoBuffer
+{
+	UndoLogNumber logno;		/* Undo log number */
+	BlockNumber blk;			/* block number */
+	Buffer		buf;			/* buffer allocated for the block */
+	bool		zero;			/* new block full of zeroes */
+};
+
+/*
+ * Prepare to update the previous transaction's next undo pointer.
+ *
+ * We want to update the next transaction pointer in the previous transaction's
+ * header (first undo record of the transaction).  In prepare phase we will
+ * unpack that record and lock the necessary buffers which we are going to
+ * overwrite and store the unpacked undo record in the context.  Later,
+ * UndoRecordUpdateTransInfo will overwrite the undo record.
+ *
+ * xact_urp - undo record pointer of the previous transaction's header
+ * urecptr - current transaction's undo record pointer which need to be set in
+ *			 the previous transaction's header.
+ */
+static void
+UndoRecordPrepareTransInfo(UndoRecordInsertContext *context, UndoRecPtr urecptr,
+						   UndoRecPtr xact_urp)
+{
+	Buffer		buffer = InvalidBuffer;
+	BlockNumber cur_blk;
+	Page		page;
+	RelFileNode rnode;
+	UndoLogControl *log;
+	UndoPackContext ucontext = {{0}};
+	XactUndoRecordInfo *xact_info =
+	&context->xact_urec_info[context->nxact_urec_info];
+	int			starting_byte;
+	int			bufidx;
+	int			index = 0;
+
+	/*
+	 * The absence of previous transaction's undo indicate that this backend
+	 * is preparing its first undo in which case we have nothing to update.
+	 */
+	if (!UndoRecPtrIsValid(xact_urp))
+		return;
+
+	log = UndoLogGet(UndoRecPtrGetLogNo(xact_urp), false);
+
+	/*
+	 * Temporary undo logs are discarded on transaction commit so we don't
+	 * need to do anything.
+	 */
+	if (log->meta.persistence == UNDO_TEMP)
+		return;
+
+	/*
+	 * Acquire the discard lock before reading the undo record so that discard
+	 * worker doesn't remove the record while we are in process of reading it.
+	 */
+	LWLockAcquire(&log->discard_lock, LW_SHARED);
+
+	/*
+	 * The absence of previous transaction's undo indicate that this backend
+	 * is preparing its first undo in which case we have nothing to update.
+	 *
+	 * Refer comments in UndoFetchRecord.
+	 */
+	if (InHotStandby)
+	{
+		if (UndoLogIsDiscarded(xact_urp))
+			return;
+	}
+	else
+	{
+		LWLockAcquire(&log->discard_lock, LW_SHARED);
+		if (xact_urp < log->oldest_data)
+		{
+			LWLockRelease(&log->discard_lock);
+			return;
+		}
+	}
+
+	UndoRecPtrAssignRelFileNode(rnode, xact_urp);
+	cur_blk = UndoRecPtrGetBlockNum(xact_urp);
+	starting_byte = UndoRecPtrGetPageOffset(xact_urp);
+
+	/* Initiate reading the undo record. */
+	BeginUnpackUndo(&ucontext);
+	while (1)
+	{
+		bufidx = UndoGetBufferSlot(context, rnode, cur_blk, RBM_NORMAL);
+		xact_info->idx_undo_buffers[index++] = bufidx;
+		buffer = context->prepared_undo_buffers[bufidx].buf;
+		page = BufferGetPage(buffer);
+
+		/* Do actual decoding. */
+		UnpackUndoData(&ucontext, page, starting_byte);
+
+		/* We just want to fetch upto transaction header so stop after that. */
+		if (ucontext.stage > UNDO_PACK_STAGE_TRANSACTION)
+			break;
+
+		/* Could not fetch the complete header so go to the next block. */
+		starting_byte = UndoLogBlockHeaderSize;
+		cur_blk++;
+	}
+
+	FinishUnpackUndo(&ucontext, &xact_info->uur);
+
+	/* We can now release the discard lock as we have read the undo record. */
+	LWLockRelease(&log->discard_lock);
+
+	/*
+	 * Set current transaction undo record pointer in previous transaction's
+	 * undo record header.
+	 */
+	xact_info->uur.uur_next = urecptr;
+
+	/*
+	 * Store undo record pointer of the previous transaction's header in the
+	 * context because we need to overwrite the header undo record in the
+	 * update phase.
+	 */
+	xact_info->urecptr = xact_urp;
+
+	context->nxact_urec_info++;
+}
+
+/*
+ * Overwrite the first undo record of the previous transaction to update its
+ * next pointer.
+ *
+ * This will insert the already prepared record by UndoRecordPrepareTransInfo.
+ * This must be called under the critical section.  This will just overwrite the
+ * header of the undo record.
+ */
+static void
+UndoRecordUpdateTransInfo(UndoRecordInsertContext *context, int idx)
+{
+	Page		page = NULL;
+	int			starting_byte;
+	int			i = 0;
+	UndoPackContext ucontext = {{0}};
+	XactUndoRecordInfo *xact_info = &context->xact_urec_info[idx];
+
+	/*
+	 * Update the next transactions start urecptr in the transaction header.
+	 */
+	starting_byte = UndoRecPtrGetPageOffset(xact_info->urecptr);
+
+	/* Initiate inserting the undo record. */
+	BeginInsertUndo(&ucontext, &xact_info->uur);
+
+	/* Main loop for updating the undo record. */
+	while (1)
+	{
+		Buffer		buffer;
+		int			buf_idx;
+
+		buf_idx = xact_info->idx_undo_buffers[i];
+		buffer = context->prepared_undo_buffers[buf_idx].buf;
+
+		/*
+		 * During recovery, there might be some blocks which are already
+		 * removed by discard process, so we can just skip inserting into
+		 * those blocks.
+		 */
+		if (!BufferIsValid(buffer))
+		{
+			Assert(InRecovery);
+
+			/*
+			 * If the buffer is not valid then skip actual writing just move
+			 * the write offset in the context so that if the next buffer is
+			 * valid we have the correct offset of the record for inserting
+			 * into that buffer.
+			 */
+			SkipInsertingUndoData(&ucontext, BLCKSZ - starting_byte);
+			if (ucontext.stage > UNDO_PACK_STAGE_TRANSACTION)
+				break;
+		}
+		else
+		{
+			page = BufferGetPage(buffer);
+
+			/* Overwrite the previously written undo record. */
+			InsertUndoData(&ucontext, page, starting_byte);
+
+			/*
+			 * We only want to overwrite the transaction header so if we have
+			 * already done so then stop.
+			 */
+			if (ucontext.stage > UNDO_PACK_STAGE_TRANSACTION)
+			{
+				MarkBufferDirty(buffer);
+				break;
+			}
+			MarkBufferDirty(buffer);
+		}
+
+		/*
+		 * Record header is spilt across blocks so go to the next block and
+		 * continue writing there.  Start writing after the undo block header.
+		 */
+		starting_byte = UndoLogBlockHeaderSize;
+		i++;
+
+		Assert(idx < MAX_BUFFER_PER_UNDO);
+	}
+}
+
+/*
+ * Find the block number in undo buffer array
+ *
+ * If it is present then just return its index otherwise search the buffer and
+ * insert an entry and lock the buffer in exclusive mode.
+ *
+ * Undo log insertions are append-only.  If the caller is writing new data
+ * that begins exactly at the beginning of a page, then there cannot be any
+ * useful data after that point.  In that case RBM_ZERO can be passed in as
+ * rbm so that we can skip a useless read of a disk block.  In all other
+ * cases, RBM_NORMAL should be passed in, to read the page in if it doesn't
+ * happen to be already in the buffer pool.
+ */
+static int
+UndoGetBufferSlot(UndoRecordInsertContext *context,
+				  RelFileNode rnode,
+				  BlockNumber blk,
+				  ReadBufferMode rbm)
+{
+	int			i;
+	Buffer		buffer;
+	XLogRedoAction action = BLK_NEEDS_REDO;
+	PreparedUndoBuffer *prepared_buffer;
+	UndoPersistence persistence = context->alloc_context.persistence;
+
+	/* Don't do anything, if we already have a buffer pinned for the block. */
+	for (i = 0; i < context->nprepared_undo_buffer; i++)
+	{
+		prepared_buffer = &context->prepared_undo_buffers[i];
+
+		/*
+		 * It's not enough to just compare the block number because the
+		 * undo_buffer might holds the undo from different undo logs (e.g when
+		 * previous transaction start header is in previous undo log) so
+		 * compare (logno + blkno).
+		 */
+		if ((blk == prepared_buffer->blk) &&
+			(prepared_buffer->logno == rnode.relNode))
+		{
+			/* caller must hold exclusive lock on buffer */
+			Assert(BufferIsLocal(prepared_buffer->buf) ||
+				   LWLockHeldByMeInMode(BufferDescriptorGetContentLock(
+																	   GetBufferDescriptor(prepared_buffer->buf - 1)),
+										LW_EXCLUSIVE));
+			return i;
+		}
+	}
+
+	/*
+	 * We did not find the block so allocate the buffer and insert into the
+	 * undo buffer array.
+	 */
+	if (InRecovery)
+		action = XLogReadBufferForRedoBlock(context->alloc_context.xlog_record,
+											SMGR_UNDO,
+											rnode,
+											UndoLogForkNum,
+											blk,
+											rbm,
+											false,
+											&buffer);
+	else
+	{
+		buffer = ReadBufferWithoutRelcache(SMGR_UNDO,
+										   rnode,
+										   UndoLogForkNum,
+										   blk,
+										   rbm,
+										   NULL,
+										   RelPersistenceForUndoPersistence(persistence));
+
+		/* Lock the buffer */
+		LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+	}
+
+	prepared_buffer =
+		&context->prepared_undo_buffers[context->nprepared_undo_buffer];
+
+	if (action == BLK_NOTFOUND)
+	{
+		Assert(InRecovery);
+
+		prepared_buffer->buf = InvalidBuffer;
+		prepared_buffer->blk = InvalidBlockNumber;
+	}
+	else
+	{
+		prepared_buffer->buf = buffer;
+		prepared_buffer->blk = blk;
+		prepared_buffer->logno = rnode.relNode;
+		prepared_buffer->zero = rbm == RBM_ZERO;
+	}
+
+	context->nprepared_undo_buffer++;
+
+	return i;
+}
+
+/*
+ * This function must be called before all the undo records which are going to
+ * get inserted under a single WAL record.
+ *
+ * nprepared - This defines the max number of undo records that can be
+ * prepared before inserting them.
+ */
+void
+BeginUndoRecordInsert(UndoRecordInsertContext *context,
+					  UndoPersistence persistence,
+					  int nprepared,
+					  XLogReaderState *xlog_record)
+{
+	uint32		nbuffers;
+
+	/* At least one prepared record should be there. */
+	if (nprepared <= 0)
+		elog(ERROR, "at least one undo record should be prepared");
+
+	/* Initialize undo log context. */
+	UndoLogBeginInsert(&context->alloc_context, persistence, xlog_record);
+
+	/* Initialize undo insert context. */
+	context->max_prepared_undo = nprepared;
+	context->nprepared_undo = 0;
+	context->nprepared_undo_buffer = 0;
+	context->nxact_urec_info = 0;
+
+	/* Allocate memory for prepared undo record space. */
+	context->prepared_undo = (PreparedUndoSpace *) palloc(nprepared *
+														  sizeof(PreparedUndoSpace));
+
+	/* Compute number of buffers. */
+	nbuffers = (nprepared + MAX_XACT_UNDO_INFO) * MAX_BUFFER_PER_UNDO;
+
+	/* Allocate memory for the prepared buffers. */
+	context->prepared_undo_buffers =
+		palloc(nbuffers * sizeof(PreparedUndoBuffer));
+}
+
+/*
+ * Call PrepareUndoInsert to tell the undo subsystem about the undo record you
+ * intended to insert.  Upon return, the necessary undo buffers are pinned and
+ * locked.
+ *
+ * This should be done before any critical section is established, since it
+ * can fail.
+ *
+ * In recovery, 'fxid' refers to the full transaction id stored in WAL,
+ * otherwise, it refers to the top full transaction id.
+ */
+UndoRecPtr
+PrepareUndoInsert(UndoRecordInsertContext *context,
+				  UnpackedUndoRecord *urec,
+				  FullTransactionId fxid)
+{
+	UndoRecordSize size;
+	UndoRecPtr	urecptr;
+	RelFileNode rnode;
+	UndoRecordSize cur_size = 0;
+	BlockNumber cur_blk;
+	FullTransactionId txid;
+	int			starting_byte;
+	int			index = 0;
+	int			bufidx;
+	ReadBufferMode rbm;
+	bool		need_xact_header;
+	UndoRecPtr	last_xact_start;
+	UndoRecPtr	prevlog_xact_start = InvalidUndoRecPtr;
+	UndoRecPtr	prevlog_insert_urp = InvalidUndoRecPtr;
+	UndoRecPtr	prevlogurp = InvalidUndoRecPtr;
+	PreparedUndoSpace *prepared_undo;
+
+	/* Already reached maximum prepared limit. */
+	if (context->nprepared_undo == context->max_prepared_undo)
+		elog(ERROR, "already reached the maximum prepared limit");
+
+	if (!FullTransactionIdIsValid(fxid))
+	{
+		/* During recovery, we must have a valid transaction id. */
+		Assert(!InRecovery);
+		txid = GetTopFullTransactionId();
+	}
+	else
+	{
+		/*
+		 * Assign the top transaction id because undo log only stores mapping
+		 * for the top most transactions.
+		 */
+		Assert(InRecovery ||
+			   FullTransactionIdEquals(fxid, GetTopFullTransactionId()));
+		txid = fxid;
+	}
+
+	/*
+	 * We don't yet know if this record needs a transaction header (ie is the
+	 * first undo record for a given transaction in a given undo log), because
+	 * you can only find out by allocating.  We'll resolve this circularity by
+	 * allocating enough space for a transaction header.  We'll only advance
+	 * by as many bytes as we turn out to need.
+	 */
+	urec->uur_next = InvalidUndoRecPtr;
+	UndoRecordSetInfo(urec);
+	urec->uur_info |= UREC_INFO_TRANSACTION;
+	size = UndoRecordExpectedSize(urec);
+
+	/* Allocate space for the record. */
+	if (InRecovery)
+	{
+		/*
+		 * We'll figure out where the space needs to be allocated by
+		 * inspecting the xlog_record.
+		 */
+		Assert(context->alloc_context.persistence == UNDO_PERMANENT);
+		urecptr = UndoLogAllocateInRecovery(&context->alloc_context,
+											XidFromFullTransactionId(txid),
+											size,
+											&need_xact_header,
+											&last_xact_start,
+											&prevlog_xact_start,
+											&prevlogurp);
+	}
+	else
+	{
+		/* Allocate space for writing the undo record. */
+		urecptr = UndoLogAllocate(&context->alloc_context,
+								  size,
+								  &need_xact_header, &last_xact_start,
+								  &prevlog_xact_start, &prevlog_insert_urp);
+
+		/*
+		 * If prevlog_xact_start is a valid undo record pointer that means
+		 * this transaction's undo records are split across undo logs.
+		 */
+		if (UndoRecPtrIsValid(prevlog_xact_start))
+		{
+			uint16		prevlen;
+
+			/*
+			 * If undo log is switch during transaction then we must get a
+			 * valid insert location in the previous undo log so that we can
+			 * compute the undo record pointer of the transaction's last
+			 * record in the previous undo log.
+			 */
+			Assert(UndoRecPtrIsValid(prevlog_insert_urp));
+
+			/* Fetch length of the last undo record of the previous log. */
+			prevlen = UndoGetPrevRecordLen(prevlog_insert_urp, InvalidBuffer,
+										   context->alloc_context.persistence);
+
+			/*
+			 * If the undo log got switched during the transaction then for
+			 * collecting all the undo record for the transaction during bulk
+			 * fetch,  we  can not read the prevlen from the end of the record
+			 * as we will not know what was the previous undo log.  So during
+			 * log switch we will directly store the last undo record pointer
+			 * of the transaction into transaction's first record of the next
+			 * undo log.
+			 *
+			 * TODO:  instead of storing this in the transaction header we can
+			 * have separate undo log switch header and store it there.
+			 */
+			prevlogurp =
+				MakeUndoRecPtr(UndoRecPtrGetLogNo(prevlog_insert_urp),
+							   (UndoRecPtrGetOffset(prevlog_insert_urp) - prevlen));
+
+			/*
+			 * Undo log switched so set prevlog info in current undo log.
+			 *
+			 * XXX can we do this directly in UndoLogAllocate ? but for that
+			 * the UndoLogAllocate might need to read the length of the last
+			 * undo record from the previous undo log but for that it might
+			 * use callback?
+			 */
+			UndoLogSwitchSetPrevLogInfo(UndoRecPtrGetLogNo(urecptr),
+										prevlog_xact_start, prevlogurp);
+		}
+	}
+
+	urec->uur_prevurp = prevlogurp;
+
+	/* Initialize transaction related members. */
+	urec->uur_progress = 0;
+	if (need_xact_header)
+	{
+		/*
+		 * TODO: Should we set urec->uur_dbid automatically?  How can you do
+		 * that, in recovery -- can we extract it from xlog_record?  For now
+		 * assume that the caller set it explicitly.
+		 */
+	}
+	else
+	{
+		urec->uur_dbid = 0;
+
+		/* We don't need a transaction header after all. */
+		urec->uur_info &= ~UREC_INFO_TRANSACTION;
+		size = UndoRecordExpectedSize(urec);
+	}
+
+	/*
+	 * If there is a physically preceding transaction in this undo log, and we
+	 * are writing the first record for this transaction that is in this undo
+	 * log (not necessarily the first ever for the transaction, because we
+	 * could have switched logs), then we need to update the size of the
+	 * preceding transaction.
+	 */
+	if (need_xact_header &&
+		UndoRecPtrGetOffset(urecptr) > UndoLogBlockHeaderSize)
+		UndoRecordPrepareTransInfo(context, urecptr, last_xact_start);
+
+	/*
+	 * If prevlog_xact_start is valid that means the transaction's undo are
+	 * split across the undo logs.  So we need to  update our own transaction
+	 * header in the previous log as well.
+	 */
+	if (UndoRecPtrIsValid(prevlog_xact_start))
+	{
+		Assert(UndoRecPtrIsValid(prevlogurp));
+		UndoRecordPrepareTransInfo(context, urecptr, prevlog_xact_start);
+	}
+
+	cur_blk = UndoRecPtrGetBlockNum(urecptr);
+	UndoRecPtrAssignRelFileNode(rnode, urecptr);
+	starting_byte = UndoRecPtrGetPageOffset(urecptr);
+
+	/*
+	 * If we happen to be writing the very first byte into this page, then
+	 * there is no need to read from disk.
+	 */
+	if (starting_byte == UndoLogBlockHeaderSize)
+		rbm = RBM_ZERO;
+	else
+		rbm = RBM_NORMAL;
+
+	prepared_undo = &context->prepared_undo[context->nprepared_undo];
+
+	do
+	{
+		bufidx = UndoGetBufferSlot(context, rnode, cur_blk, rbm);
+		if (cur_size == 0)
+			cur_size = BLCKSZ - starting_byte;
+		else
+			cur_size += BLCKSZ - UndoLogBlockHeaderSize;
+
+		/* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+		Assert(index < MAX_BUFFER_PER_UNDO);
+
+		/* Keep the track of the buffers we have pinned and locked. */
+		prepared_undo->undo_buffer_idx[index++] = bufidx;
+
+		/*
+		 * If we need more pages they'll be all new so we can definitely skip
+		 * reading from disk.
+		 */
+		rbm = RBM_ZERO;
+		cur_blk++;
+	} while (cur_size < size);
+
+	UndoLogAdvance(&context->alloc_context, size);
+
+	/*
+	 * Save prepared undo record information into the context which will be
+	 * used by InsertPreparedUndo to insert the undo record.
+	 */
+	prepared_undo->urec = urec;
+	prepared_undo->urp = urecptr;
+	prepared_undo->size = size;
+
+	context->nprepared_undo++;
+
+	return urecptr;
+}
+
+/*
+ * Insert a previously-prepared undo records.
+ *
+ * This function will write the actual undo record into the buffers which are
+ * already pinned and locked in PreparedUndoInsert, and mark them dirty.  This
+ * step should be performed inside a critical section.
+ */
+void
+InsertPreparedUndo(UndoRecordInsertContext *context)
+{
+	UndoPackContext ucontext = {{0}};
+	PreparedUndoSpace *prepared_undo;
+	Page		page = NULL;
+	int			starting_byte;
+	int			bufidx = 0;
+	int			idx;
+	int			i;
+
+	/* There must be at least one prepared undo record. */
+	Assert(context->nprepared_undo > 0);
+
+	/*
+	 * This must be called under a critical section or we must be in recovery.
+	 */
+	Assert(InRecovery || CritSectionCount > 0);
+
+	for (idx = 0; idx < context->nprepared_undo; idx++)
+	{
+		prepared_undo = &context->prepared_undo[idx];
+
+		Assert(prepared_undo->size ==
+			   UndoRecordExpectedSize(prepared_undo->urec));
+
+		bufidx = 0;
+
+		/*
+		 * Compute starting offset of the page where to start inserting undo
+		 * record.
+		 */
+		starting_byte = UndoRecPtrGetPageOffset(prepared_undo->urp);
+
+		/* Initiate inserting the undo record. */
+		BeginInsertUndo(&ucontext, prepared_undo->urec);
+
+		/* Main loop for writing the undo record. */
+		do
+		{
+			Buffer		buffer;
+
+			buffer = context->prepared_undo_buffers[
+													prepared_undo->undo_buffer_idx[bufidx]].buf;
+
+			/*
+			 * During recovery, there might be some blocks which are already
+			 * deleted due to some discard command so we can just skip
+			 * inserting into those blocks.
+			 */
+			if (!BufferIsValid(buffer))
+			{
+				Assert(InRecovery);
+
+				/*
+				 * Skip actual writing just update the context so that we have
+				 * write offset for inserting into next blocks.
+				 */
+				SkipInsertingUndoData(&ucontext, BLCKSZ - starting_byte);
+				if (ucontext.stage == UNDO_PACK_STAGE_DONE)
+					break;
+			}
+			else
+			{
+				page = BufferGetPage(buffer);
+
+				/*
+				 * Initialize the page whenever we try to write the first
+				 * record in page.  We start writing immediately after the
+				 * block header.
+				 */
+				if (starting_byte == UndoLogBlockHeaderSize)
+					PageInit(page, BLCKSZ, 0);
+
+				/*
+				 * Try to insert the record into the current page. If it
+				 * doesn't succeed then recall the routine with the next page.
+				 */
+				InsertUndoData(&ucontext, page, starting_byte);
+				if (ucontext.stage == UNDO_PACK_STAGE_DONE)
+				{
+					MarkBufferDirty(buffer);
+					break;
+				}
+				MarkBufferDirty(buffer);
+			}
+
+			/* Insert remaining record in next block. */
+			starting_byte = UndoLogBlockHeaderSize;
+			bufidx++;
+
+			/* undo record can't use buffers more than MAX_BUFFER_PER_UNDO. */
+			Assert(bufidx < MAX_BUFFER_PER_UNDO);
+		} while (true);
+
+		/* Advance the insert pointer past this record. */
+		UndoLogAdvanceFinal(prepared_undo->urp, prepared_undo->size);
+	}
+
+	/* Update previously prepared transaction headers. */
+	for (i = 0; i < context->nxact_urec_info; i++)
+		UndoRecordUpdateTransInfo(context, i);
+}
+
+/*
+ * Release all the memory and buffer pins hold for inserting the undo records.
+ */
+void
+FinishUndoRecordInsert(UndoRecordInsertContext *context)
+{
+	int			i;
+
+	/* Release buffer pins and lock. */
+	for (i = 0; i < context->nprepared_undo_buffer; i++)
+	{
+		if (BufferIsValid(context->prepared_undo_buffers[i].buf))
+			UnlockReleaseBuffer(context->prepared_undo_buffers[i].buf);
+	}
+
+	/* Free memory allocated for the prepare undo and prepared buffers. */
+	pfree(context->prepared_undo_buffers);
+	pfree(context->prepared_undo);
+}
+
+/*
+ * Helper function for UndoFetchRecord and UndoBulkFetchRecord
+ *
+ * curbuf - If an input buffer is valid then this function will not release the
+ * pin on that buffer.  If the buffer is not valid then it will assign curbuf
+ * with the first buffer of the current undo record and also it will keep the
+ * pin and lock on that buffer in a hope that while traversing the undo chain
+ * the caller might want to read the previous undo record from the same block.
+ */
+static UnpackedUndoRecord *
+UndoGetOneRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode rnode,
+				 UndoPersistence persistence, Buffer *curbuf)
+{
+	Page		page;
+	int			starting_byte = UndoRecPtrGetPageOffset(urp);
+	BlockNumber cur_blk;
+	UndoPackContext ucontext = {{0}};
+	Buffer		buffer = *curbuf;
+
+	cur_blk = UndoRecPtrGetBlockNum(urp);
+
+	/* Initiate unpacking one undo record. */
+	BeginUnpackUndo(&ucontext);
+
+	while (true)
+	{
+		/* If we already have a buffer then no need to allocate a new one. */
+		if (!BufferIsValid(buffer))
+		{
+			buffer = ReadBufferWithoutRelcache(SMGR_UNDO,
+											   rnode, UndoLogForkNum, cur_blk,
+											   RBM_NORMAL, NULL,
+											   RelPersistenceForUndoPersistence(persistence));
+
+			/*
+			 * Remember the first buffer where this undo started as next undo
+			 * record what we fetch might fall on the same buffer.
+			 */
+			if (!BufferIsValid(*curbuf))
+				*curbuf = buffer;
+
+			/* Acquire shared lock on the buffer before reading undo from it. */
+			LockBuffer(buffer, BUFFER_LOCK_SHARE);
+		}
+
+		page = BufferGetPage(buffer);
+
+		UnpackUndoData(&ucontext, page, starting_byte);
+
+		/*
+		 * We are done if we have reached to the done stage otherwise move to
+		 * next block and continue reading from there.
+		 */
+		if (ucontext.stage == UNDO_PACK_STAGE_DONE)
+		{
+			if (buffer != *curbuf)
+				UnlockReleaseBuffer(buffer);
+			break;
+		}
+
+		/*
+		 * The record spans more than a page so we would have copied it (see
+		 * UnpackUndoRecord).  In such cases, we can release the buffer.
+		 */
+		if (buffer != *curbuf)
+			UnlockReleaseBuffer(buffer);
+		buffer = InvalidBuffer;
+
+		/* Go to next block. */
+		cur_blk++;
+		starting_byte = UndoLogBlockHeaderSize;
+	}
+
+	/* Final step of unpacking. */
+	FinishUnpackUndo(&ucontext, urec);
+
+	return urec;
+}
+
+/*
+ * Helper function for UndoFetchRecord to reset the unpacked undo record.
+ */
+static void
+ResetUndoRecord(UnpackedUndoRecord *urec, UndoRecPtr urp, RelFileNode *rnode,
+				RelFileNode *prevrec_rnode, Buffer *buffer)
+{
+	/*
+	 * If we have a valid buffer pinned then just ensure that we want to find
+	 * the next tuple from the same block.  Otherwise release the buffer and
+	 * set it invalid
+	 */
+	if (BufferIsValid(*buffer))
+	{
+		/*
+		 * Undo buffer will be changed if the next undo record belongs to a
+		 * different block or undo log.
+		 */
+		if ((UndoRecPtrGetBlockNum(urp) !=
+			 BufferGetBlockNumber(*buffer)) ||
+			(prevrec_rnode->relNode != rnode->relNode))
+		{
+			ReleaseBuffer(*buffer);
+			*buffer = InvalidBuffer;
+		}
+	}
+
+	if (urec->uur_payload.data)
+		pfree(urec->uur_payload.data);
+	if (urec->uur_tuple.data)
+		pfree(urec->uur_tuple.data);
+
+	/* Reset the urec before fetching the tuple */
+	urec->uur_tuple.data = NULL;
+	urec->uur_tuple.len = 0;
+	urec->uur_payload.data = NULL;
+	urec->uur_payload.len = 0;
+}
+
+/*
+ * Fetch undo record for given urp
+ *
+ * Fetch the next undo record for given blkno, offset and transaction id (if
+ * valid).  The same tuple can be modified by multiple transactions, so during
+ * undo chain traversal sometimes we need to distinguish based on transaction
+ * id.  Callers that don't have any such requirement can pass
+ * InvalidTransactionId.
+ *
+ * Start the search from urp.  Caller need to call UndoRecordRelease to release the
+ * resources allocated by this function.
+ *
+ * urec_ptr_out is undo record pointer of the qualified undo record if valid
+ * pointer is passed.
+ *
+ * callback function decides whether particular undo record satisfies the
+ * condition of caller.
+ *
+ * Returns the required undo record if found, otherwise, return NULL which
+ * means either the record is already discarded or there is no such record
+ * in the undo chain.
+ */
+UnpackedUndoRecord *
+UndoFetchRecord(UndoRecPtr urp, BlockNumber blkno, OffsetNumber offset,
+				TransactionId xid, UndoRecPtr *urec_ptr_out,
+				SatisfyUndoRecordCallback callback)
+{
+	RelFileNode rnode,
+				prevrec_rnode = {0};
+	UnpackedUndoRecord *urec = NULL;
+	Buffer		buffer = InvalidBuffer;
+	int			logno;
+
+	if (urec_ptr_out)
+		*urec_ptr_out = InvalidUndoRecPtr;
+
+	/*
+	 * Allocate memory for holding the undo record, caller should be
+	 * responsible for freeing this memory.
+	 */
+	urec = palloc0(sizeof(UnpackedUndoRecord));
+	UndoRecPtrAssignRelFileNode(rnode, urp);
+
+	/* Find the undo record pointer we are interested in. */
+	while (true)
+	{
+		UndoLogControl *log;
+
+		logno = UndoRecPtrGetLogNo(urp);
+		log = UndoLogGet(logno, true);
+		if (log == NULL)
+		{
+			urp = InvalidUndoRecPtr;
+			break;
+		}
+
+		/*
+		 * Prevent UndoDiscardOneLog() from discarding data while we try to
+		 * read it.  Usually we would acquire log->mutex to read log->meta
+		 * members, but in this case we know that discard can't move without
+		 * also holding log->discard_lock.
+		 *
+		 * In Hot Standby mode log->oldest_data is never initialized because
+		 * it's get updated by undo discard worker whereas in HotStandby undo
+		 * logs are getting discarded using discard WAL.  So in HotStandby we
+		 * can directly check whether the undo record pointer is discarded or
+		 * not.  But, we can not do same for normal case because discard
+		 * worker can concurrently discard the undo logs.
+		 *
+		 * XXX We can avoid this check by always initializing log->oldest_data
+		 * in HotStandby mode as well whenever we apply discard WAL.  But, for
+		 * doing that we need to acquire discard lock just for setting this
+		 * variable?
+		 */
+		if (InHotStandby)
+		{
+			if (UndoLogIsDiscarded(urp))
+			{
+				urp = InvalidUndoRecPtr;
+				break;
+			}
+		}
+		else
+		{
+			LWLockAcquire(&log->discard_lock, LW_SHARED);
+			if (urp < log->oldest_data)
+			{
+				LWLockRelease(&log->discard_lock);
+				urp = InvalidUndoRecPtr;
+				break;
+			}
+		}
+
+		/* Fetch the current undo record. */
+		UndoGetOneRecord(urec, urp, rnode, log->meta.persistence, &buffer);
+
+		/* Release the discard lock after fetching the record. */
+		LWLockRelease(&log->discard_lock);
+
+		if (blkno == InvalidBlockNumber)
+			break;
+
+		/* Check whether the undo record satisfies conditions */
+		if (callback(urec, blkno, offset, xid))
+			break;
+
+		urp = urec->uur_blkprev;
+		prevrec_rnode = rnode;
+
+		/* Get rnode for the current undo record pointer. */
+		UndoRecPtrAssignRelFileNode(rnode, urp);
+
+		/* Reset the current undo record before fetching the next. */
+		ResetUndoRecord(urec, urp, &rnode, &prevrec_rnode, &buffer);
+	}
+
+	/*
+	 * If we have not found any valid undo record that means it might have
+	 * already got discarded so release the memory we allocated for unpacked
+	 * undo record and set urec to NULL.
+	 */
+	if (!UndoRecPtrIsValid(urp))
+	{
+		pfree(urec);
+		urec = NULL;
+	}
+	else if (urec_ptr_out != NULL)
+		*urec_ptr_out = urp;
+
+	/* Release the last buffer. */
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+
+	return urec;
+}
+
+/*
+ * Prefetch undo pages, if prefetch_pages are behind prefetch_target
+ */
+static void
+PrefetchUndoPages(RelFileNode rnode, int prefetch_target, int *prefetch_pages,
+				  BlockNumber to_blkno, BlockNumber from_blkno,
+				  char persistence)
+{
+	int			nprefetch;
+	BlockNumber startblock;
+	BlockNumber lastprefetched;
+
+	/* Calculate last prefetched page in the previous iteration. */
+	lastprefetched = from_blkno - *prefetch_pages;
+
+	/* We have already prefetched all the pages of the transaction's undo. */
+	if (lastprefetched <= to_blkno)
+		return;
+
+	/* Calculate number of blocks to be prefetched. */
+	nprefetch =
+		Min(prefetch_target - *prefetch_pages, lastprefetched - to_blkno);
+
+	/* Where to start prefetch. */
+	startblock = lastprefetched - nprefetch;
+
+	while (nprefetch--)
+	{
+		PrefetchBufferWithoutRelcache(SMGR_UNDO, rnode, MAIN_FORKNUM,
+									  startblock++,
+									  RelPersistenceForUndoPersistence(persistence));
+		(*prefetch_pages)++;
+	}
+}
+
+/*
+ * Read undo records of the transaction in bulk
+ *
+ * Read undo records between from_urecptr and to_urecptr until we exhaust the
+ * the memory size specified by undo_apply_size.  If we could not read all the
+ * records till to_urecptr then the caller should consume current set of records
+ * and call this function again.
+ *
+ * from_urecptr		- Where to start fetching the undo records.  If we can not
+ *					  read all the records because of memory limit then this
+ *					  will be set to the previous undo record pointer from where
+ *					  we need to start fetching on next call. Otherwise it will
+ *					  be set to InvalidUndoRecPtr.
+ * to_urecptr		- Last undo record pointer to be fetched.
+ * undo_apply_size	- Memory segment limit to collect undo records.
+ * nrecords			- Number of undo records read.
+ * one_page			- Caller is applying undo only for one block not for
+ *					  complete transaction.  If this is set true then instead
+ *					  of following transaction undo chain using prevlen we will
+ *					  follow the block prev chain of the block so that we can
+ *					  avoid reading many unnecessary undo records of the
+ *					  transaction.
+ */
+UndoRecInfo *
+UndoBulkFetchRecord(UndoRecPtr *from_urecptr, UndoRecPtr to_urecptr,
+					int undo_apply_size, int *nrecords, bool one_page)
+{
+	RelFileNode rnode;
+	UndoRecPtr	urecptr,
+				prev_urec_ptr;
+	BlockNumber blkno;
+	BlockNumber to_blkno;
+	Buffer		buffer = InvalidBuffer;
+	UnpackedUndoRecord *uur = NULL;
+	UndoRecInfo *urp_array;
+	int			urp_array_size = 1024;
+	int			urp_index = 0;
+	int			prefetch_target = 0;
+	int			prefetch_pages = 0;
+	Size		total_size = 0;
+	TransactionId xid = InvalidTransactionId;
+
+	/*
+	 * In one_page mode we are fetching undo only for one page instead of
+	 * fetching all the undo of the transaction.  Basically, we are fetching
+	 * interleaved undo records.  So it does not make sense to do any prefetch
+	 * in that case.
+	 */
+	if (!one_page)
+		prefetch_target = target_prefetch_pages;
+
+	/*
+	 * Allocate initial memory to hold the undo record info, we can expand it
+	 * if needed.
+	 */
+	urp_array = (UndoRecInfo *) palloc(sizeof(UndoRecInfo) * urp_array_size);
+	urecptr = *from_urecptr;
+
+	prev_urec_ptr = InvalidUndoRecPtr;
+	*from_urecptr = InvalidUndoRecPtr;
+
+	/* Read undo chain backward until we reach to the first undo record.  */
+	while (1)
+	{
+		BlockNumber from_blkno;
+		UndoLogControl *log;
+		UndoPersistence persistence;
+		int			size;
+		int			logno;
+
+		logno = UndoRecPtrGetLogNo(urecptr);
+		log = UndoLogGet(logno, true);
+		if (log == NULL)
+		{
+			if (BufferIsValid(buffer))
+				UnlockReleaseBuffer(buffer);
+			return NULL;
+		}
+		persistence = log->meta.persistence;
+
+		UndoRecPtrAssignRelFileNode(rnode, urecptr);
+		to_blkno = UndoRecPtrGetBlockNum(to_urecptr);
+		from_blkno = UndoRecPtrGetBlockNum(urecptr);
+
+		/* Allocate memory for next undo record. */
+		uur = palloc0(sizeof(UnpackedUndoRecord));
+
+		/*
+		 * If next undo record pointer to be fetched is not on the same block
+		 * then release the old buffer and reduce the prefetch_pages count by
+		 * one as we have consumed one page. Otherwise, just set the old
+		 * buffer into the new undo record so that UndoGetOneRecord don't read
+		 * the buffer again.
+		 */
+		blkno = UndoRecPtrGetBlockNum(urecptr);
+		if (!UndoRecPtrIsValid(prev_urec_ptr) ||
+			UndoRecPtrGetLogNo(prev_urec_ptr) != logno ||
+			UndoRecPtrGetBlockNum(prev_urec_ptr) != blkno)
+		{
+			/* Release the previous buffer */
+			if (BufferIsValid(buffer))
+			{
+				UnlockReleaseBuffer(buffer);
+				buffer = InvalidBuffer;
+			}
+
+			if (prefetch_pages > 0)
+				prefetch_pages--;
+		}
+
+		/*
+		 * If prefetch_pages are half of the prefetch_target then it's time to
+		 * prefetch again.
+		 */
+		if (prefetch_pages < prefetch_target / 2)
+			PrefetchUndoPages(rnode, prefetch_target, &prefetch_pages, to_blkno,
+							  from_blkno, persistence);
+
+		/*
+		 * In one_page mode it's possible that the undo of the transaction
+		 * might have been applied by worker and undo got discarded. Prevent
+		 * discard worker from discarding undo data while we are reading it.
+		 * See detail comment in UndoFetchRecord.  In normal mode we are
+		 * holding transaction undo action lock so it can not be discarded.
+		 */
+		if (one_page)
+		{
+			/* Refer comments in UndoFetchRecord. */
+			if (InHotStandby)
+			{
+				if (UndoLogIsDiscarded(urecptr))
+					break;
+			}
+			else
+			{
+				LWLockAcquire(&log->discard_lock, LW_SHARED);
+				if (urecptr < log->oldest_data)
+				{
+					LWLockRelease(&log->discard_lock);
+					break;
+				}
+			}
+
+			/* Read the undo record. */
+			UndoGetOneRecord(uur, urecptr, rnode, persistence, &buffer);
+
+			/* Release the discard lock after fetching the record. */
+			LWLockRelease(&log->discard_lock);
+		}
+		else
+			UndoGetOneRecord(uur, urecptr, rnode, persistence, &buffer);
+
+		/*
+		 * As soon as the transaction id is changed we can stop fetching the
+		 * undo record.  Ideally, to_urecptr should control this but while
+		 * reading undo only for a page we don't know what is the end undo
+		 * record pointer for the transaction.
+		 */
+		if (one_page)
+		{
+			if (!TransactionIdIsValid(xid))
+				xid = uur->uur_xid;
+			else if (xid != uur->uur_xid)
+				break;
+		}
+
+		/* Remember the previous undo record pointer. */
+		prev_urec_ptr = urecptr;
+
+		/*
+		 * Calculate the previous undo record pointer of the transaction.  If
+		 * we are reading undo only for a page then follow the blkprev chain
+		 * of the page.  Otherwise, calculate the previous undo record pointer
+		 * using transaction's current undo record pointer and the prevlen. If
+		 * undo record has a valid uur_prevurp, this is the case of log switch
+		 * during the transaction so we can directly use uur_prevurp as our
+		 * previous undo record pointer of the transaction.
+		 */
+		if (one_page)
+			urecptr = uur->uur_blkprev;
+		else if (prev_urec_ptr == to_urecptr ||
+				 uur->uur_info & UREC_INFO_TRANSACTION)
+			urecptr = InvalidUndoRecPtr;
+		else if (UndoRecPtrIsValid(uur->uur_prevurp))
+			urecptr = uur->uur_prevurp;
+		else
+			urecptr = UndoGetPrevUndoRecptr(prev_urec_ptr, buffer, persistence);
+
+		/* We have consumed all elements of the urp_array so expand its size. */
+		if (urp_index >= urp_array_size)
+		{
+			urp_array_size *= 2;
+			urp_array =
+				repalloc(urp_array, sizeof(UndoRecInfo) * urp_array_size);
+		}
+
+		/* Add entry in the urp_array */
+		urp_array[urp_index].index = urp_index;
+		urp_array[urp_index].urp = prev_urec_ptr;
+		urp_array[urp_index].uur = uur;
+		urp_index++;
+
+		/* We have fetched all the undo records for the transaction. */
+		if (!UndoRecPtrIsValid(urecptr) || (prev_urec_ptr == to_urecptr))
+			break;
+
+		/*
+		 * Including current record, if we have crossed the memory limit then
+		 * stop processing more records.  Remember to set the from_urecptr so
+		 * that on next call we can resume fetching undo records where we left
+		 * it.
+		 */
+		size = UnpackedUndoRecordSize(uur);
+		total_size += size;
+
+		if (total_size >= undo_apply_size)
+		{
+			*from_urecptr = urecptr;
+			break;
+		}
+	}
+
+	/* Release the last buffer. */
+	if (BufferIsValid(buffer))
+		UnlockReleaseBuffer(buffer);
+
+	*nrecords = urp_index;
+
+	return urp_array;
+}
+
+/*
+ * Release the resources allocated by UndoFetchRecord.
+ */
+void
+UndoRecordRelease(UnpackedUndoRecord *urec)
+{
+	if (urec->uur_payload.data)
+		pfree(urec->uur_payload.data);
+	if (urec->uur_tuple.data)
+		pfree(urec->uur_tuple.data);
+
+	pfree(urec);
+}
+
+/*
+ * Register the undo buffers.
+ */
+void
+RegisterUndoLogBuffers(UndoRecordInsertContext *context, uint8 first_block_id)
+{
+	int			idx;
+	int			flags;
+
+	for (idx = 0; idx < context->nprepared_undo_buffer; idx++)
+	{
+		flags = context->prepared_undo_buffers[idx].zero
+			? REGBUF_KEEP_DATA_AFTER_CP | REGBUF_WILL_INIT
+			: REGBUF_KEEP_DATA_AFTER_CP;
+		XLogRegisterBuffer(first_block_id + idx,
+						   context->prepared_undo_buffers[idx].buf, flags);
+		UndoLogRegister(&context->alloc_context, first_block_id + idx,
+						context->prepared_undo_buffers[idx].logno);
+	}
+}
+
+/*
+ * Set LSN on undo page.
+*/
+void
+UndoLogBuffersSetLSN(UndoRecordInsertContext *context, XLogRecPtr recptr)
+{
+	int			idx;
+
+	for (idx = 0; idx < context->nprepared_undo_buffer; idx++)
+		PageSetLSN(BufferGetPage(context->prepared_undo_buffers[idx].buf),
+				   recptr);
+}
+
+/*
+ * Read length of the previous undo record.
+ *
+ * This function will take an undo record pointer as an input and read the
+ * length of the previous undo record which is stored at the end of the previous
+ * undo record.  If the undo record is split then this will add the undo block
+ * header size in the total length.
+ */
+static uint16
+UndoGetPrevRecordLen(UndoRecPtr urp, Buffer input_buffer,
+					 UndoPersistence upersistence)
+{
+	UndoLogOffset page_offset = UndoRecPtrGetPageOffset(urp);
+	BlockNumber cur_blk = UndoRecPtrGetBlockNum(urp);
+	Buffer		buffer = input_buffer;
+	Page		page = NULL;
+	char	   *pagedata;
+	char		prevlen[2];
+	RelFileNode rnode;
+	int			byte_to_read = sizeof(uint16);
+	char		persistence;
+	uint16		prev_rec_len = 0;
+
+	/* Get relfilenode. */
+	UndoRecPtrAssignRelFileNode(rnode, urp);
+	persistence = RelPersistenceForUndoPersistence(upersistence);
+
+	if (BufferIsValid(buffer))
+	{
+		page = BufferGetPage(buffer);
+		pagedata = (char *) page;
+	}
+
+	/*
+	 * Length if the previous undo record is store at the end of that record
+	 * so just fetch last 2 bytes.
+	 */
+	while (byte_to_read > 0)
+	{
+		/* Read buffer if the current buffer is not valid. */
+		if (!BufferIsValid(buffer))
+		{
+			buffer = ReadBufferWithoutRelcache(SMGR_UNDO, rnode, UndoLogForkNum,
+											   cur_blk, RBM_NORMAL, NULL,
+											   persistence);
+
+			LockBuffer(buffer, BUFFER_LOCK_SHARE);
+
+			page = BufferGetPage(buffer);
+			pagedata = (char *) page;
+		}
+
+		page_offset -= 1;
+
+		/*
+		 * Read current prevlen byte from current block if page_offset hasn't
+		 * reach to undo block header.  Otherwise, go to the previous block
+		 * and continue reading from there.
+		 */
+		if (page_offset >= UndoLogBlockHeaderSize)
+		{
+			prevlen[byte_to_read - 1] = pagedata[page_offset];
+			byte_to_read -= 1;
+		}
+		else
+		{
+			/*
+			 * Release the current buffer if it is not provide by the caller.
+			 */
+			if (input_buffer != buffer)
+				UnlockReleaseBuffer(buffer);
+
+			/*
+			 * Could not read complete prevlen from the current block so go to
+			 * the previous block and start reading from end of the block.
+			 */
+			cur_blk -= 1;
+			page_offset = BLCKSZ;
+
+			/*
+			 * Reset buffer so that we can read it again for the previous
+			 * block.
+			 */
+			buffer = InvalidBuffer;
+		}
+	}
+
+	prev_rec_len = *(uint16 *) (prevlen);
+
+	/*
+	 * If previous undo record is not completely stored in this page then add
+	 * UndoLogBlockHeaderSize in total length so that the call can use this
+	 * length to compute the undo record pointer of the previous undo record.
+	 */
+	if (UndoRecPtrGetPageOffset(urp) - UndoLogBlockHeaderSize < prev_rec_len)
+		prev_rec_len += UndoLogBlockHeaderSize;
+
+	/* Release the buffer if we have locally read it. */
+	if (input_buffer != buffer)
+		UnlockReleaseBuffer(buffer);
+
+	return prev_rec_len;
+}
+
+/*
+ * Calculate the previous undo record pointer for the transaction.
+ *
+ * This will take current undo record pointer of the transaction as an input
+ * and calculate the previous undo record pointer of the transaction.
+ */
+UndoRecPtr
+UndoGetPrevUndoRecptr(UndoRecPtr urp, Buffer buffer,
+					  UndoPersistence upersistence)
+{
+	UndoLogNumber logno = UndoRecPtrGetLogNo(urp);
+	UndoLogOffset offset = UndoRecPtrGetOffset(urp);
+	uint16		prevlen;
+
+	/* Read length of the previous undo record. */
+	prevlen = UndoGetPrevRecordLen(urp, buffer, upersistence);
+
+	/* calculate the previous undo record pointer */
+	return MakeUndoRecPtr(logno, offset - prevlen);
+}
diff --git a/src/backend/access/undo/undorecord.c b/src/backend/access/undo/undorecord.c
new file mode 100644
index 0000000..7e585ac
--- /dev/null
+++ b/src/backend/access/undo/undorecord.c
@@ -0,0 +1,744 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.c
+ *	  encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undorecord.c
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/subtrans.h"
+#include "access/undorecord.h"
+#include "catalog/pg_tablespace.h"
+#include "storage/block.h"
+
+/* Prototypes for static functions. */
+static bool InsertUndoBytes(char *sourceptr, int sourcelen,
+				char **writeptr, char *endptr,
+				int *total_bytes_written, int *partial_write);
+static bool ReadUndoBytes(char *destptr, int readlen,
+			  char **readptr, char *endptr,
+			  int *total_bytes_read, int *partial_read);
+
+/*
+ * Compute and return the expected size of an undo record.
+ */
+Size
+UndoRecordExpectedSize(UnpackedUndoRecord *uur)
+{
+	Size		size;
+
+	size = SizeOfUndoRecordHeader + sizeof(uint16);
+	if ((uur->uur_info & UREC_INFO_FORK) != 0)
+		size += sizeof(ForkNumber);
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+		size += SizeOfUndoRecordBlock;
+	if ((uur->uur_info & UREC_INFO_BLKPREV) != 0)
+		size += sizeof(UndoRecPtr);
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+		size += SizeOfUndoRecordTransaction;
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		size += SizeOfUndoRecordPayload;
+		size += uur->uur_payload.len;
+		size += uur->uur_tuple.len;
+	}
+
+	return size;
+}
+
+/*
+ * Compute size of the Unpacked undo record in memory
+ */
+Size
+UnpackedUndoRecordSize(UnpackedUndoRecord *uur)
+{
+	Size		size;
+
+	size = sizeof(UnpackedUndoRecord);
+
+	/* Add payload size if record contains payload data. */
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		size += uur->uur_payload.len;
+		size += uur->uur_tuple.len;
+	}
+
+	return size;
+}
+
+/*
+ * Initiate inserting an undo record.
+ *
+ * This function will initialize the context for inserting and undo record
+ * which will be inserted by calling InsertUndoData.
+ */
+void
+BeginInsertUndo(UndoPackContext *ucontext, UnpackedUndoRecord *uur)
+{
+	ucontext->stage = UNDO_PACK_STAGE_HEADER;
+	ucontext->already_processed = 0;
+	ucontext->partial_bytes = 0;
+
+	/* Copy undo record header. */
+	ucontext->urec_hd.urec_rmid = uur->uur_rmid;
+	ucontext->urec_hd.urec_type = uur->uur_type;
+	ucontext->urec_hd.urec_info = uur->uur_info;
+	ucontext->urec_hd.urec_reloid = uur->uur_reloid;
+	ucontext->urec_hd.urec_xid = uur->uur_xid;
+	ucontext->urec_hd.urec_cid = uur->uur_cid;
+
+	/* Copy undo record relation header if it is present. */
+	if ((uur->uur_info & UREC_INFO_FORK) != 0)
+		ucontext->urec_fork = uur->uur_fork;
+
+	/* Copy undo record block header if it is present. */
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+	{
+		ucontext->urec_blk.urec_block = uur->uur_block;
+		ucontext->urec_blk.urec_offset = uur->uur_offset;
+	}
+
+	/* Copy undo record block prev if it is present. */
+	if ((uur->uur_info & UREC_INFO_BLKPREV) != 0)
+	{
+		ucontext->urec_blkprev = uur->uur_blkprev;
+	}
+
+	/* Copy undo record transaction header if it is present. */
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+	{
+		ucontext->urec_txn.urec_progress = uur->uur_progress;
+		ucontext->urec_txn.urec_xidepoch = uur->uur_xidepoch;
+		ucontext->urec_txn.urec_dbid = uur->uur_dbid;
+		ucontext->urec_txn.urec_prevurp = uur->uur_prevurp;
+		ucontext->urec_txn.urec_next = uur->uur_next;
+	}
+
+	/* Copy undo record payload header and data if it is present. */
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		ucontext->urec_payload.urec_payload_len = uur->uur_payload.len;
+		ucontext->urec_payload.urec_tuple_len = uur->uur_tuple.len;
+		ucontext->urec_payloaddata = uur->uur_payload.data;
+		ucontext->urec_tupledata = uur->uur_tuple.data;
+	}
+
+	ucontext->undo_len = UndoRecordExpectedSize(uur);
+}
+
+/*
+ * Insert the undo record into the input page from the unpack undo context.
+ *
+ * Caller can  call this function multiple times until desired stage is reached.
+ * This will write the undo record into the page.
+ */
+void
+InsertUndoData(UndoPackContext *ucontext, Page page, int starting_byte)
+{
+	char	   *writeptr = (char *) page + starting_byte;
+	char	   *endptr = (char *) page + BLCKSZ;
+
+	switch (ucontext->stage)
+	{
+		case UNDO_PACK_STAGE_HEADER:
+			/* Insert undo record header. */
+			if (!InsertUndoBytes((char *) &ucontext->urec_hd,
+								 SizeOfUndoRecordHeader, &writeptr, endptr,
+								 &ucontext->already_processed,
+								 &ucontext->partial_bytes))
+				return;
+			ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_FORKNUM:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
+			{
+				/* Insert undo record fork number. */
+				if (!InsertUndoBytes((char *) &ucontext->urec_fork,
+									 sizeof(ForkNumber),
+									 &writeptr, endptr,
+									 &ucontext->already_processed,
+									 &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_BLOCK;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_BLOCK:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
+			{
+				/* Insert undo record block header. */
+				if (!InsertUndoBytes((char *) &ucontext->urec_blk,
+									 SizeOfUndoRecordBlock,
+									 &writeptr, endptr,
+									 &ucontext->already_processed,
+									 &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_BLOCKPREV;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_BLOCKPREV:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_BLKPREV) != 0)
+			{
+				/* Insert undo record blkprev. */
+				if (!InsertUndoBytes((char *) &ucontext->urec_blkprev,
+									 sizeof(UndoRecPtr),
+									 &writeptr, endptr,
+									 &ucontext->already_processed,
+									 &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_TRANSACTION:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
+			{
+				/* Insert undo record transaction header. */
+				if (!InsertUndoBytes((char *) &ucontext->urec_txn,
+									 SizeOfUndoRecordTransaction,
+									 &writeptr, endptr,
+									 &ucontext->already_processed,
+									 &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_PAYLOAD:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
+			{
+				/* Insert undo record payload header. */
+				if (!InsertUndoBytes((char *) &ucontext->urec_payload,
+									 SizeOfUndoRecordPayload,
+									 &writeptr, endptr,
+									 &ucontext->already_processed,
+									 &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_PAYLOAD_DATA:
+			{
+				int			len = ucontext->urec_payload.urec_payload_len;
+
+				if (len > 0)
+				{
+					/* Insert payload data. */
+					if (!InsertUndoBytes((char *) ucontext->urec_payloaddata,
+										 len, &writeptr, endptr,
+										 &ucontext->already_processed,
+										 &ucontext->partial_bytes))
+						return;
+				}
+				ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
+			}
+			/* fall through */
+
+		case UNDO_PACK_STAGE_TUPLE_DATA:
+			{
+				int			len = ucontext->urec_payload.urec_tuple_len;
+
+				if (len > 0)
+				{
+					/* Insert tuple data. */
+					if (!InsertUndoBytes((char *) ucontext->urec_tupledata,
+										 len, &writeptr, endptr,
+										 &ucontext->already_processed,
+										 &ucontext->partial_bytes))
+						return;
+				}
+				ucontext->stage = UNDO_PACK_STAGE_UNDO_LENGTH;
+			}
+			/* fall through */
+
+		case UNDO_PACK_STAGE_UNDO_LENGTH:
+			/* Insert undo length. */
+			if (!InsertUndoBytes((char *) &ucontext->undo_len,
+								 sizeof(uint16), &writeptr, endptr,
+								 &ucontext->already_processed,
+								 &ucontext->partial_bytes))
+				return;
+
+			ucontext->stage = UNDO_PACK_STAGE_DONE;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_DONE:
+			/* Nothing to be done. */
+			break;
+
+		default:
+			Assert(0);			/* Invalid stage */
+	}
+}
+
+/*
+ * Skip inserting undo record
+ *
+ * Don't insert the actual undo record instead just update the context data
+ * so that if we need to insert the remaining partial record to the next
+ * block then we have right context.
+ */
+void
+SkipInsertingUndoData(UndoPackContext *ucontext, int bytes_to_skip)
+{
+	switch (ucontext->stage)
+	{
+		case UNDO_PACK_STAGE_HEADER:
+			if (bytes_to_skip < SizeOfUndoRecordHeader)
+			{
+				ucontext->partial_bytes = bytes_to_skip;
+				return;
+			}
+			bytes_to_skip -= SizeOfUndoRecordHeader;
+			ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_FORKNUM:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
+			{
+				if (bytes_to_skip < sizeof(ForkNumber))
+				{
+					ucontext->partial_bytes = bytes_to_skip;
+					return;
+				}
+				bytes_to_skip -= sizeof(ForkNumber);
+			}
+
+			ucontext->stage = UNDO_PACK_STAGE_BLOCK;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_BLOCK:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
+			{
+				if (bytes_to_skip < SizeOfUndoRecordBlock)
+				{
+					ucontext->partial_bytes = bytes_to_skip;
+					return;
+				}
+				bytes_to_skip -= SizeOfUndoRecordBlock;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_BLOCKPREV;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_BLOCKPREV:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_BLKPREV) != 0)
+			{
+				if (bytes_to_skip < sizeof(UndoRecPtr))
+				{
+					ucontext->partial_bytes = bytes_to_skip;
+					return;
+				}
+				bytes_to_skip -= sizeof(UndoRecPtr);
+			}
+			ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_TRANSACTION:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
+			{
+				if (bytes_to_skip < SizeOfUndoRecordTransaction)
+				{
+					ucontext->partial_bytes = bytes_to_skip;
+					return;
+				}
+				bytes_to_skip -= SizeOfUndoRecordTransaction;
+			}
+
+			ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_PAYLOAD:
+			/* Skip payload header. */
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
+			{
+				if (bytes_to_skip < SizeOfUndoRecordPayload)
+				{
+					ucontext->partial_bytes = bytes_to_skip;
+					return;
+				}
+				bytes_to_skip -= SizeOfUndoRecordPayload;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_PAYLOAD_DATA:
+			if (ucontext->urec_payload.urec_payload_len > 0)
+			{
+				if (bytes_to_skip < ucontext->urec_payload.urec_payload_len)
+				{
+					ucontext->partial_bytes = bytes_to_skip;
+					return;
+				}
+				bytes_to_skip -= ucontext->urec_payload.urec_payload_len;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_TUPLE_DATA:
+			if (ucontext->urec_payload.urec_tuple_len > 0)
+			{
+				if (bytes_to_skip < ucontext->urec_payload.urec_tuple_len)
+				{
+					ucontext->partial_bytes = bytes_to_skip;
+					return;
+				}
+				bytes_to_skip -= ucontext->urec_payload.urec_tuple_len;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_UNDO_LENGTH;
+			/* fall through */
+
+		case UNDO_PACK_STAGE_UNDO_LENGTH:
+			ucontext->stage = UNDO_PACK_STAGE_DONE;
+			 /* fall through */ ;
+
+		case UNDO_PACK_STAGE_DONE:
+			/* Nothing to be done. */
+			break;
+
+		default:
+			Assert(0);			/* Invalid stage */
+	}
+}
+
+/*
+ * Write undo bytes from a particular source, but only to the extent that
+ * they weren't written previously and will fit.
+ *
+ * 'sourceptr' points to the source data, and 'sourcelen' is the length of
+ * that data in bytes.
+ *
+ * 'writeptr' points to the insertion point for these bytes, and is updated
+ * for whatever we write.  The insertion point must not pass 'endptr', which
+ * represents the end of the buffer into which we are writing.
+ *
+ * 'my_bytes_written' is a pointer to the count of previous-written bytes
+ * from this and following structures in this undo record; that is, any
+ * bytes that are part of previous structures in the record have already
+ * been subtracted out.
+ *
+ * 'total_bytes_written' points to the count of all previously-written bytes,
+ * and must it must be updated for the bytes we write.
+ *
+ * The return value is false if we ran out of space before writing all
+ * the bytes, and otherwise true.
+ */
+static bool
+InsertUndoBytes(char *sourceptr, int sourcelen, char **writeptr, char *endptr,
+				int *total_bytes_written, int *partial_write)
+{
+	int			can_write;
+	int			remaining;
+
+	/* Compute number of bytes we can write. */
+	remaining = sourcelen - *partial_write;
+	can_write = Min(remaining, endptr - *writeptr);
+
+	/* Bail out if no bytes can be written. */
+	if (can_write == 0)
+		return false;
+
+	/* Copy the bytes we can write. */
+	memcpy(*writeptr, sourceptr + *partial_write, can_write);
+
+	/* Update bookkeeping information. */
+	*writeptr += can_write;
+	*total_bytes_written += can_write;
+
+	/* Could not read whole data so set the partial_read. */
+	if (can_write < remaining)
+	{
+		*partial_write += can_write;
+		return false;
+	}
+
+	/* Return true only if we wrote the whole thing. */
+	*partial_write = 0;
+	return true;
+}
+
+/*
+ * Initiate unpacking an undo record.
+ *
+ * This function will initialize the context for unpacking the undo record which
+ * will be unpacked by calling UnpackUndoData.
+ */
+void
+BeginUnpackUndo(UndoPackContext *ucontext)
+{
+	ucontext->stage = UNDO_PACK_STAGE_HEADER;
+	ucontext->already_processed = 0;
+	ucontext->partial_bytes = 0;
+}
+
+/*
+ * Read the undo record from the input page to the unpack undo context.
+ *
+ * Caller can  call this function multiple times until desired stage is reached.
+ * This will read the undo record from the page and store the data into unpack
+ * undo context, which can be later copied to unpacked undo record by calling
+ * FinishUnpackUndo.
+ */
+void
+UnpackUndoData(UndoPackContext *ucontext, Page page, int starting_byte)
+{
+	char	   *readptr = (char *) page + starting_byte;
+	char	   *endptr = (char *) page + BLCKSZ;
+
+	switch (ucontext->stage)
+	{
+		case UNDO_PACK_STAGE_HEADER:
+			if (!ReadUndoBytes((char *) &ucontext->urec_hd,
+							   SizeOfUndoRecordHeader, &readptr, endptr,
+							   &ucontext->already_processed,
+							   &ucontext->partial_bytes))
+				return;
+			ucontext->stage = UNDO_PACK_STAGE_FORKNUM;
+			/* fall through */
+		case UNDO_PACK_STAGE_FORKNUM:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_FORK) != 0)
+			{
+				if (!ReadUndoBytes((char *) &ucontext->urec_fork,
+								   sizeof(ForkNumber),
+								   &readptr, endptr, &ucontext->already_processed,
+								   &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_BLOCK;
+			/* fall through */
+		case UNDO_PACK_STAGE_BLOCK:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_BLOCK) != 0)
+			{
+				if (!ReadUndoBytes((char *) &ucontext->urec_blk,
+								   SizeOfUndoRecordBlock,
+								   &readptr, endptr, &ucontext->already_processed,
+								   &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_BLOCKPREV;
+			/* fall through */
+		case UNDO_PACK_STAGE_BLOCKPREV:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_BLKPREV) != 0)
+			{
+				if (!ReadUndoBytes((char *) &ucontext->urec_blkprev,
+								   sizeof(UndoRecPtr),
+								   &readptr, endptr, &ucontext->already_processed,
+								   &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_TRANSACTION;
+			/* fall through */
+		case UNDO_PACK_STAGE_TRANSACTION:
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_TRANSACTION) != 0)
+			{
+				if (!ReadUndoBytes((char *) &ucontext->urec_txn,
+								   SizeOfUndoRecordTransaction,
+								   &readptr, endptr, &ucontext->already_processed,
+								   &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_PAYLOAD;
+			/* fall through */
+		case UNDO_PACK_STAGE_PAYLOAD:
+			/* Read payload header. */
+			if ((ucontext->urec_hd.urec_info & UREC_INFO_PAYLOAD) != 0)
+			{
+				if (!ReadUndoBytes((char *) &ucontext->urec_payload,
+								   SizeOfUndoRecordPayload,
+								   &readptr, endptr, &ucontext->already_processed,
+								   &ucontext->partial_bytes))
+					return;
+			}
+			ucontext->stage = UNDO_PACK_STAGE_PAYLOAD_DATA;
+			/* fall through */
+		case UNDO_PACK_STAGE_PAYLOAD_DATA:
+			{
+				int			len = ucontext->urec_payload.urec_payload_len;
+
+				/* Allocate memory for the payload data if not already done. */
+				if (len > 0)
+				{
+					if (ucontext->urec_payloaddata == NULL)
+						ucontext->urec_payloaddata = (char *) palloc(len);
+
+					/* Read payload data. */
+					if (!ReadUndoBytes((char *) ucontext->urec_payloaddata, len,
+									   &readptr, endptr, &ucontext->already_processed,
+									   &ucontext->partial_bytes))
+						return;
+				}
+				ucontext->stage = UNDO_PACK_STAGE_TUPLE_DATA;
+				/* fall through */
+			}
+		case UNDO_PACK_STAGE_TUPLE_DATA:
+			{
+				int			len = ucontext->urec_payload.urec_tuple_len;
+
+				/* Allocate memory for the tuple data if not already done. */
+				if (len > 0)
+				{
+					if (ucontext->urec_tupledata == NULL)
+						ucontext->urec_tupledata = (char *) palloc(len);
+
+					/* Read tuple data. */
+					if (!ReadUndoBytes((char *) ucontext->urec_tupledata, len,
+									   &readptr, endptr, &ucontext->already_processed,
+									   &ucontext->partial_bytes))
+						return;
+				}
+
+				ucontext->stage = UNDO_PACK_STAGE_DONE;
+				/* fall through */
+			}
+		case UNDO_PACK_STAGE_DONE:
+			/* Nothing to be done. */
+			break;
+		default:
+			Assert(0);			/* Invalid stage */
+	}
+
+	return;
+}
+
+/*
+ * Final step of unpacking the undo record.
+ *
+ * Copy the undo record data from the unpack undo context to the input unpacked
+ * undo record.
+ */
+void
+FinishUnpackUndo(UndoPackContext *ucontext, UnpackedUndoRecord *uur)
+{
+	/* Copy undo record header. */
+	uur->uur_rmid = ucontext->urec_hd.urec_rmid;
+	uur->uur_type = ucontext->urec_hd.urec_type;
+	uur->uur_info = ucontext->urec_hd.urec_info;
+	uur->uur_reloid = ucontext->urec_hd.urec_reloid;
+	uur->uur_xid = ucontext->urec_hd.urec_xid;
+	uur->uur_cid = ucontext->urec_hd.urec_cid;
+
+	/* Copy undo record relation header if it is present. */
+	if ((uur->uur_info & UREC_INFO_FORK) != 0)
+		uur->uur_fork = ucontext->urec_fork;
+
+	/* Copy undo record block header if it is present. */
+	if ((uur->uur_info & UREC_INFO_BLOCK) != 0)
+	{
+		uur->uur_block = ucontext->urec_blk.urec_block;
+		uur->uur_offset = ucontext->urec_blk.urec_offset;
+	}
+
+	/* Copy undo record block prev header if it is present. */
+	if ((uur->uur_info & UREC_INFO_BLKPREV) != 0)
+	{
+		uur->uur_blkprev = ucontext->urec_blkprev;
+	}
+
+	/* Copy undo record transaction header if it is present. */
+	if ((uur->uur_info & UREC_INFO_TRANSACTION) != 0)
+	{
+		uur->uur_progress = ucontext->urec_txn.urec_progress;
+		uur->uur_xidepoch = ucontext->urec_txn.urec_xidepoch;
+		uur->uur_dbid = ucontext->urec_txn.urec_dbid;
+		uur->uur_prevurp = ucontext->urec_txn.urec_prevurp;
+		uur->uur_next = ucontext->urec_txn.urec_next;
+	}
+
+	/* Copy undo record payload header and data if it is present. */
+	if ((uur->uur_info & UREC_INFO_PAYLOAD) != 0)
+	{
+		uur->uur_payload.len = ucontext->urec_payload.urec_payload_len;
+		uur->uur_tuple.len = ucontext->urec_payload.urec_tuple_len;
+
+		/* Read payload data if its length is not 0. */
+		if (uur->uur_payload.len != 0)
+			uur->uur_payload.data = ucontext->urec_payloaddata;
+
+		/* Read tuple data if its length is not 0. */
+		if (uur->uur_tuple.len != 0)
+			uur->uur_tuple.data = ucontext->urec_tupledata;
+	}
+}
+
+/*
+ * Read undo bytes into a particular destination,
+ *
+ * 'destptr' points to the source data, and 'readlen' is the length of
+ * that data to be read in bytes.
+ *
+ * 'readptr' points to the read point for these bytes, and is updated
+ * for how much we read.  The read point must not pass 'endptr', which
+ * represents the end of the buffer from which we are reading.
+ *
+ * 'partial_read' is a pointer to the count of previous partial read bytes
+ *
+ * 'total_bytes_read' points to the count of all previously-read bytes,
+ * and must likewise be updated for the bytes we read.
+ *
+ * nocopy if this flag is set true then it will just skip the readlen
+ * size in undo but it will not copy into the buffer.
+ *
+ * The return value is false if we ran out of space before read all
+ * the bytes, and otherwise true.
+ */
+static bool
+ReadUndoBytes(char *destptr, int readlen, char **readptr, char *endptr,
+			  int *total_bytes_read, int *partial_read)
+{
+	int			can_read;
+	int			remaining;
+
+	/* Compute number of bytes we can read. */
+	remaining = readlen - *partial_read;
+	can_read = Min(remaining, endptr - *readptr);
+
+	/* Bail out if no bytes can be read. */
+	if (can_read == 0)
+		return false;
+
+	/* Copy the bytes we can read. */
+	memcpy(destptr + *partial_read, *readptr, can_read);
+
+	/* Update bookkeeping information. */
+	*readptr += can_read;
+	*total_bytes_read += can_read;
+
+	/* Could not read whole data so set the partial_read. */
+	if (can_read < remaining)
+	{
+		*partial_read += can_read;
+		return false;
+	}
+
+	/* Return true only if we wrote the whole thing. */
+	*partial_read = 0;
+
+	return true;
+}
+
+/*
+ * Set uur_info for an UnpackedUndoRecord appropriately based on which
+ * other fields are set.
+ */
+void
+UndoRecordSetInfo(UnpackedUndoRecord *uur)
+{
+	if (uur->uur_fork != MAIN_FORKNUM)
+		uur->uur_info |= UREC_INFO_FORK;
+	if (uur->uur_block != InvalidBlockNumber)
+		uur->uur_info |= UREC_INFO_BLOCK;
+	if (uur->uur_blkprev != InvalidUndoRecPtr)
+		uur->uur_info |= UREC_INFO_BLKPREV;
+	if (uur->uur_next != InvalidUndoRecPtr)
+		uur->uur_info |= UREC_INFO_TRANSACTION;
+	if (uur->uur_payload.len || uur->uur_tuple.len)
+		uur->uur_info |= UREC_INFO_PAYLOAD;
+}
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 7966a9e..592c338 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -47,6 +47,7 @@
 #define EpochFromFullTransactionId(x)	((uint32) ((x).value >> 32))
 #define XidFromFullTransactionId(x)		((uint32) (x).value)
 #define U64FromFullTransactionId(x)		((x).value)
+#define FullTransactionIdEquals(a, b)	((a).value == (b).value)
 #define FullTransactionIdPrecedes(a, b)	((a).value < (b).value)
 #define FullTransactionIdIsValid(x)		TransactionIdIsValid(XidFromFullTransactionId(x))
 #define InvalidFullTransactionId		FullTransactionIdFromEpochAndXid(0, InvalidTransactionId)
diff --git a/src/include/access/undoaccess.h b/src/include/access/undoaccess.h
new file mode 100644
index 0000000..3e2154b
--- /dev/null
+++ b/src/include/access/undoaccess.h
@@ -0,0 +1,115 @@
+/*-------------------------------------------------------------------------
+ *
+ * undoaccess.h
+ *	  entry points for inserting/fetching undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undoaccess.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOACCESS_H
+#define UNDOACCESS_H
+
+#include "access/undolog.h"
+#include "access/undorecord.h"
+#include "access/xlogdefs.h"
+#include "catalog/pg_class.h"
+
+/* undo record information */
+typedef struct UndoRecInfo
+{
+	int			index;			/* Index of the element. */
+	UndoRecPtr	urp;			/* undo recptr (undo record location). */
+	UnpackedUndoRecord *uur;	/* actual undo record. */
+} UndoRecInfo;
+
+/*
+ * Typedef for callback function for UndoFetchRecord.
+ *
+ * This checks whether an undorecord satisfies the given conditions.
+ */
+typedef bool (*SatisfyUndoRecordCallback) (UnpackedUndoRecord *urec,
+										   BlockNumber blkno,
+										   OffsetNumber offset,
+										   TransactionId xid);
+
+/*
+ * XXX Do we want to support undo tuple size which is more than the BLCKSZ
+ * if not than undo record can spread across 2 buffers at the max.
+ */
+#define MAX_BUFFER_PER_UNDO    2
+
+/*
+ * Maximum number of the XactUndoRecordInfo for updating the transaction header.
+ * Usually it's 1 for updating next link of previous transaction's header
+ * if we are starting a new transaction.  But, in some cases where the same
+ * transaction is spilled to the next log, we update our own transaction's
+ * header in previous undo log as well as the header of the previous transaction
+ * in the new log.
+ */
+#define MAX_XACT_UNDO_INFO	2
+
+typedef struct PreparedUndoSpace PreparedUndoSpace;
+typedef struct PreparedUndoBuffer PreparedUndoBuffer;
+
+/*
+ * This structure holds the informations for updating the transaction's undo
+ * record header (first undo record of the transaction).  We need to update the
+ * transaction header for various purposes a) updating the next undo record
+ * pointer for maintaining the transactions chain inside a undo log
+ * b) updating the undo apply progress in the transaction header.  During
+ * prepare phase we will keep all the information handy in this structure and
+ * that will be used for updating the actual record inside the critical section.
+ */
+typedef struct XactUndoRecordInfo
+{
+	UndoRecPtr	urecptr;		/* txn's start urecptr */
+	int			idx_undo_buffers[MAX_BUFFER_PER_UNDO];
+	UnpackedUndoRecord uur;		/* undo record header */
+} XactUndoRecordInfo;
+
+/*
+ * Context for preparing and inserting undo records..
+ */
+typedef struct UndoRecordInsertContext
+{
+	UndoLogAllocContext alloc_context;
+	PreparedUndoSpace *prepared_undo;	/* prepared undo. */
+	PreparedUndoBuffer *prepared_undo_buffers;	/* Buffers for prepared undo. */
+	XactUndoRecordInfo xact_urec_info[MAX_XACT_UNDO_INFO];	/* Information for
+															 * Updating transaction
+															 * header. */
+	int			nprepared_undo; /* Number of prepared undo records. */
+	int			max_prepared_undo;	/* Max prepared undo for this operation. */
+	int			nprepared_undo_buffer;	/* Number of undo buffers. */
+	int			nxact_urec_info;	/* Number of previous xact info. */
+} UndoRecordInsertContext;
+
+extern void BeginUndoRecordInsert(UndoRecordInsertContext *context,
+					  UndoPersistence persistence,
+					  int nprepared,
+					  XLogReaderState *xlog_record);
+extern UndoRecPtr PrepareUndoInsert(UndoRecordInsertContext *context,
+				  UnpackedUndoRecord *urec, FullTransactionId fxid);
+extern void InsertPreparedUndo(UndoRecordInsertContext *context);
+extern void FinishUndoRecordInsert(UndoRecordInsertContext *context);
+extern void RegisterUndoLogBuffers(UndoRecordInsertContext *context,
+					   uint8 first_block_id);
+extern void UndoLogBuffersSetLSN(UndoRecordInsertContext *context,
+					 XLogRecPtr recptr);
+extern UnpackedUndoRecord *UndoFetchRecord(UndoRecPtr urp,
+				BlockNumber blkno, OffsetNumber offset,
+				TransactionId xid, UndoRecPtr *urec_ptr_out,
+				SatisfyUndoRecordCallback callback);
+extern UndoRecInfo *UndoBulkFetchRecord(UndoRecPtr *from_urecptr,
+					UndoRecPtr to_urecptr,
+					int undo_apply_size, int *nrecords,
+					bool one_page);
+extern void UndoRecordRelease(UnpackedUndoRecord *urec);
+extern UndoRecPtr UndoGetPrevUndoRecptr(UndoRecPtr urp, Buffer buffer,
+					  UndoPersistence upersistence);
+
+#endif							/* UNDOINSERT_H */
diff --git a/src/include/access/undorecord.h b/src/include/access/undorecord.h
new file mode 100644
index 0000000..4ce1e76
--- /dev/null
+++ b/src/include/access/undorecord.h
@@ -0,0 +1,231 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorecord.h
+ *	  encode and decode undo records
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undorecord.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDORECORD_H
+#define UNDORECORD_H
+
+#include "access/undolog.h"
+#include "lib/stringinfo.h"
+#include "storage/block.h"
+#include "storage/bufpage.h"
+#include "storage/buf.h"
+#include "storage/off.h"
+
+
+/*
+ * Every undo record begins with an UndoRecordHeader structure, which is
+ * followed by the additional structures indicated by the contents of
+ * urec_info.  All structures are packed into the alignment without padding
+ * bytes, and the undo record itself need not be aligned either, so care
+ * must be taken when reading the header.
+ */
+typedef struct UndoRecordHeader
+{
+	RmgrId		urec_rmid;		/* RMGR [XXX:TODO: this creates an alignment
+								 * hole?] */
+	uint8		urec_type;		/* record type code */
+	uint8		urec_info;		/* flag bits */
+	Oid			urec_reloid;	/* relation OID */
+
+	/*
+	 * Transaction id that has modified the tuple for which this undo record
+	 * is written.  We use this to skip the undo records.  See comments atop
+	 * function UndoFetchRecord.
+	 */
+	TransactionId urec_xid;		/* Transaction id */
+	CommandId	urec_cid;		/* command id */
+} UndoRecordHeader;
+
+#define SizeOfUndoRecordHeader	\
+	(offsetof(UndoRecordHeader, urec_cid) + sizeof(CommandId))
+
+/*
+ * If UREC_INFO_RELATION_DETAILS is set, an UndoRecordRelationDetails structure
+ * follows.
+ *
+ * If UREC_INFO_BLOCK is set, an UndoRecordBlock structure follows.
+ *
+ * If UREC_INFO_TRANSACTION is set, an UndoRecordTransaction structure
+ * follows.
+ *
+ * If UREC_INFO_BLKPREV is set, an UndoRecordBlockPrev structure follows.
+ *
+ * If UREC_INFO_PAYLOAD is set, an UndoRecordPayload structure follows.
+ *
+ * When (as will often be the case) multiple structures are present, they
+ * appear in the same order in which the constants are defined here.  That is,
+ * UndoRecordRelationDetails appears first.
+ */
+#define UREC_INFO_FORK						0x01
+#define UREC_INFO_BLOCK						0x02
+#define UREC_INFO_BLKPREV					0x04
+#define UREC_INFO_TRANSACTION				0x08
+#define UREC_INFO_PAYLOAD					0x10
+
+/*
+ *  Information for a block to which this record pertains.
+ */
+typedef struct UndoRecordBlock
+{
+	BlockNumber urec_block;		/* block number */
+	OffsetNumber urec_offset;	/* offset number */
+} UndoRecordBlock;
+
+#define SizeOfUndoRecordBlock \
+	(offsetof(UndoRecordBlock, urec_offset) + sizeof(OffsetNumber))
+
+/*
+ * Information for a transaction to which this undo belongs.  This
+ * also stores the dbid and the progress of the undo apply during rollback.
+ */
+typedef struct UndoRecordTransaction
+{
+	/*
+	 * This indicates undo action apply progress, 0 means not started, 1 means
+	 * completed.  In future, it can also be used to show the progress of how
+	 * much undo has been applied so far with some formula.
+	 */
+	uint32		urec_progress;
+	uint32		urec_xidepoch;	/* epoch of the current transaction */
+	Oid			urec_dbid;		/* database id */
+
+	/*
+	 * Transaction's previous undo record pointer when a transaction spans
+	 * across undo logs.  The first undo record in the new log stores the
+	 * previous undo record pointer in the previous log as we can't calculate
+	 * that directly using prevlen during rollback.
+	 *
+	 * TODO: instead of keeping in transaction header we can have new log
+	 * switch header.
+	 */
+	UndoRecPtr	urec_prevurp;
+	UndoRecPtr	urec_next;		/* urec pointer of the next transaction */
+} UndoRecordTransaction;
+
+#define SizeOfUndoRecordTransaction \
+	(offsetof(UndoRecordTransaction, urec_next) + sizeof(UndoRecPtr))
+
+/*
+ * Information about the amount of payload data and tuple data present
+ * in this record.  The payload bytes immediately follow the structures
+ * specified by flag bits in urec_info, and the tuple bytes follow the
+ * payload bytes.
+ */
+typedef struct UndoRecordPayload
+{
+	uint16		urec_payload_len;	/* # of payload bytes */
+	uint16		urec_tuple_len; /* # of tuple bytes */
+} UndoRecordPayload;
+
+#define SizeOfUndoRecordPayload \
+	(offsetof(UndoRecordPayload, urec_tuple_len) + sizeof(uint16))
+
+/*
+ * Information that can be used to create an undo record or that can be
+ * extracted from one previously created.  The raw undo record format is
+ * difficult to manage, so this structure provides a convenient intermediate
+ * form that is easier for callers to manage.
+ *
+ * When creating an undo record from an UnpackedUndoRecord, caller should
+ * set uur_info to 0.  It will be initialized by the first call to
+ * UndoRecordSetInfo or InsertUndoRecord.  We do set it in
+ * UndoRecordAllocate for transaction specific header information.
+ *
+ * When an undo record is decoded into an UnpackedUndoRecord, all fields
+ * will be initialized, but those for which no information is available
+ * will be set to invalid or default values, as appropriate.
+ */
+typedef struct UnpackedUndoRecord
+{
+	RmgrId		uur_rmid;		/* rmgr ID */
+	uint8		uur_type;		/* record type code */
+	uint8		uur_info;		/* flag bits */
+	Oid			uur_reloid;		/* relation OID */
+	TransactionId uur_xid;		/* transaction id */
+	CommandId	uur_cid;		/* command id */
+	ForkNumber	uur_fork;		/* fork number */
+	UndoRecPtr	uur_blkprev;	/* byte offset of previous undo for block */
+	BlockNumber uur_block;		/* block number */
+	OffsetNumber uur_offset;	/* offset number */
+	uint32		uur_xidepoch;	/* epoch of the inserting transaction. */
+	UndoRecPtr	uur_prevurp;	/* urec pointer to the previous record in the
+								 * different log */
+	UndoRecPtr	uur_next;		/* urec pointer of the next transaction */
+	Oid			uur_dbid;		/* database id */
+
+	/* undo applying progress, see detail comment in UndoRecordTransaction */
+	uint32		uur_progress;
+	StringInfoData uur_payload; /* payload bytes */
+	StringInfoData uur_tuple;	/* tuple bytes */
+} UnpackedUndoRecord;
+
+typedef enum UndoPackStage
+{
+	UNDO_PACK_STAGE_HEADER,		/* We have not yet processed even the record
+								 * header; we need to do that next. */
+	UNDO_PACK_STAGE_FORKNUM,	/* The next thing to be processed is the
+								 * relation fork number, if present. */
+	UNDO_PACK_STAGE_BLOCK,		/* The next thing to be processed is the block
+								 * details, if present. */
+	UNDO_PACK_STAGE_BLOCKPREV,	/* The next thing to be processed is the block
+								 * prev info. */
+	UNDO_PACK_STAGE_TRANSACTION,	/* The next thing to be processed is the
+									 * transaction details, if present. */
+	UNDO_PACK_STAGE_PAYLOAD,	/* The next thing to be processed is the
+								 * payload details, if present */
+	UNDO_PACK_STAGE_PAYLOAD_DATA,	/* The next thing to be processed is the
+									 * payload data */
+	UNDO_PACK_STAGE_TUPLE_DATA, /* The next thing to be processed is the tuple
+								 * data */
+	UNDO_PACK_STAGE_UNDO_LENGTH,	/* Next thing to processed is undo length. */
+	UNDO_PACK_STAGE_DONE		/* complete */
+} UndoPackStage;
+
+/*
+ * Undo record context for inserting/unpacking undo record.  This will hold
+ * intermediate state of undo record processed so far.
+ */
+typedef struct UndoPackContext
+{
+	UndoRecordHeader urec_hd;	/* Main header */
+	ForkNumber	urec_fork;		/* Relation fork number */
+	UndoRecordBlock urec_blk;	/* Block header */
+	UndoRecPtr	urec_blkprev;	/* Block prev */
+	UndoRecordTransaction urec_txn; /* Transaction header */
+	UndoRecordPayload urec_payload; /* Payload data */
+	char	   *urec_payloaddata;
+	char	   *urec_tupledata;
+	uint16		undo_len;		/* Length of the undo record. */
+	int			already_processed;	/* Number of bytes read/written so far */
+	int			partial_bytes;	/* Number of partial bytes read/written */
+	UndoPackStage stage;		/* Undo pack stage */
+} UndoPackContext;
+
+extern void UndoRecordSetInfo(UnpackedUndoRecord *uur);
+extern Size UndoRecordExpectedSize(UnpackedUndoRecord *uur);
+extern Size UnpackedUndoRecordSize(UnpackedUndoRecord *uur);
+extern bool InsertUndoRecord(UnpackedUndoRecord *uur, Page page,
+				 int starting_byte, int *already_written,
+				 int remaining_bytes, uint16 undo_len, bool header_only);
+extern void BeginUnpackUndo(UndoPackContext *ucontext);
+extern void UnpackUndoData(UndoPackContext *ucontext, Page page,
+			   int starting_byte);
+extern void FinishUnpackUndo(UndoPackContext *ucontext,
+				 UnpackedUndoRecord *uur);
+extern void BeginInsertUndo(UndoPackContext *ucontext,
+				UnpackedUndoRecord *uur);
+extern void InsertUndoData(UndoPackContext *ucontext, Page page,
+			   int starting_byte);
+extern void SkipInsertingUndoData(UndoPackContext *ucontext,
+					  int bytes_to_skip);
+
+#endif							/* UNDORECORD_H */
-- 
1.8.3.1

