From c2df9aa0a159d10b4522ae0c7e915ecee019759c Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Thu, 8 Aug 2019 16:05:53 -0400
Subject: [PATCH v3] New undo request manager, now with some xact.c
 integration.

---
 src/backend/access/transam/xact.c             |    9 +
 src/backend/access/undo/Makefile              |    2 +-
 src/backend/access/undo/undorequest.c         | 1101 +++++++++++++++++
 src/backend/access/undo/undostate.c           |  526 ++++++++
 src/backend/lib/rbtree.c                      |   46 +-
 src/backend/storage/ipc/ipci.c                |    3 +
 src/backend/storage/lmgr/lwlocknames.txt      |    1 +
 src/include/access/transam.h                  |    1 +
 src/include/access/undorequest.h              |   76 ++
 src/include/access/undostate.h                |   37 +
 src/include/lib/rbtree.h                      |   42 +-
 src/test/modules/Makefile                     |    1 +
 .../test_undo_request_manager/.gitignore      |    4 +
 .../test_undo_request_manager/Makefile        |   21 +
 .../expected/test_undo_request_manager.out    |   28 +
 .../sql/test_undo_request_manager.sql         |   16 +
 .../test_undo_request_manager--1.0.sql        |    9 +
 .../test_undo_request_manager.c               |  139 +++
 .../test_undo_request_manager.control         |    4 +
 src/tools/pgindent/typedefs.list              |    5 +
 20 files changed, 2043 insertions(+), 28 deletions(-)
 create mode 100644 src/backend/access/undo/undorequest.c
 create mode 100644 src/backend/access/undo/undostate.c
 create mode 100644 src/include/access/undorequest.h
 create mode 100644 src/include/access/undostate.h
 create mode 100644 src/test/modules/test_undo_request_manager/.gitignore
 create mode 100644 src/test/modules/test_undo_request_manager/Makefile
 create mode 100644 src/test/modules/test_undo_request_manager/expected/test_undo_request_manager.out
 create mode 100644 src/test/modules/test_undo_request_manager/sql/test_undo_request_manager.sql
 create mode 100644 src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql
 create mode 100644 src/test/modules/test_undo_request_manager/test_undo_request_manager.c
 create mode 100644 src/test/modules/test_undo_request_manager/test_undo_request_manager.control

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d7930c077d..6eacc9e242 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -26,6 +26,7 @@
 #include "access/subtrans.h"
 #include "access/transam.h"
 #include "access/twophase.h"
+#include "access/undostate.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "access/xloginsert.h"
@@ -2243,6 +2244,7 @@ CommitTransaction(void)
 	AtEOXact_PgStat(true, is_parallel_worker);
 	AtEOXact_Snapshot(true, false);
 	AtEOXact_ApplyLauncher(true);
+	AtCommit_UndoState();
 	pgstat_report_xact_timestamp(0);
 
 	CurrentResourceOwner = NULL;
@@ -2566,6 +2568,7 @@ AbortTransaction(void)
 	TransactionState s = CurrentTransactionState;
 	TransactionId latestXid;
 	bool		is_parallel_worker;
+	bool		perform_foreground_undo;
 
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
@@ -2729,6 +2732,8 @@ AbortTransaction(void)
 		AtEOXact_HashTables(false);
 		AtEOXact_PgStat(false, is_parallel_worker);
 		AtEOXact_ApplyLauncher(false);
+		AtAbort_UndoState(&perform_foreground_undo);
+		/* XXX need to do something with perform_foreground_undo */
 		pgstat_report_xact_timestamp(0);
 	}
 
@@ -4825,6 +4830,7 @@ CommitSubTransaction(void)
 	AtEOSubXact_PgStat(true, s->nestingLevel);
 	AtSubCommit_Snapshot(s->nestingLevel);
 	AtEOSubXact_ApplyLauncher(true, s->nestingLevel);
+	AtSubCommit_UndoState(s->nestingLevel);
 
 	/*
 	 * We need to restore the upper transaction's read-only state, in case the
@@ -4852,6 +4858,7 @@ static void
 AbortSubTransaction(void)
 {
 	TransactionState s = CurrentTransactionState;
+	bool	perform_foreground_undo;
 
 	/* Prevent cancel/die interrupt while cleaning up */
 	HOLD_INTERRUPTS();
@@ -4979,6 +4986,8 @@ AbortSubTransaction(void)
 		AtEOSubXact_PgStat(false, s->nestingLevel);
 		AtSubAbort_Snapshot(s->nestingLevel);
 		AtEOSubXact_ApplyLauncher(false, s->nestingLevel);
+		AtSubAbort_UndoState(s->nestingLevel, &perform_foreground_undo);
+		/* XXX need to do something with perform_foreground_undo */
 	}
 
 	/*
diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile
index 219c6963cf..845a4802b4 100644
--- a/src/backend/access/undo/Makefile
+++ b/src/backend/access/undo/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undolog.o
+OBJS = undolog.o undorequest.o undostate.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/undo/undorequest.c b/src/backend/access/undo/undorequest.c
new file mode 100644
index 0000000000..37660cdb5f
--- /dev/null
+++ b/src/backend/access/undo/undorequest.c
@@ -0,0 +1,1101 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorequest.c
+ *		Undo request manager.
+ *
+ * From the moment a transaction begins until the moment that it commits,
+ * there is a possibility that it might abort, either due to an exception
+ * or because the entire system is restarted (e.g. because of a power
+ * cut). If this happens, all undo generated by that transaction prior
+ * to the abort must be applied.  To ensure this, the calling code must
+ * ensure that an "undo request" is registered for every transaction
+ * that generates undo.
+ *
+ * The undo request should be registered before the transaction writes any
+ * undo records (except for temporary undo records, which the creating backend
+ * will need to process locally). If the transaction goes on to commit, the
+ * undo request can be deleted; if it goes on to abort, it needs to be updated
+ * with the final size of the undo generated by that transaction so that
+ * we can prioritize it appropriately. One of the key tasks of this module
+ * is to decide on the order in which undo requests should been processed;
+ * see GetNextUndoRequest for details.
+ *
+ * We have only a fixed amount of shared memory to store undo requests;
+ * because an undo request has to be created before any undo that might
+ * need to be processed is written, we should never end up in a situation
+ * where there are more existing undo requests that can fit. In extreme
+ * cases, this might cause us to have to refuse to create new requests,
+ * but that should very rare.  If we're starting to run low on space,
+ * FinalizeUndoRequest() will signal callers that undo should be
+ * performed in the foreground; actually hitting the hard limit requires
+ * foreground undo to be interrupted by a crash.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undorequest.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/undorequest.h"
+#include "lib/rbtree.h"
+#include "storage/shmem.h"
+#include "utils/timestamp.h"
+
+/*
+ * An UndoRequest represents the possible need to perform undo actions for
+ * a transaction if it aborts; thus, it should be allocated before writing
+ * undo that might require the system to perform cleanup actions (except
+ * temporary undo, for which the backend is always responsible) and
+ * deallocated when it is clear that no such actions will need to be
+ * performed or when they have all been performed successfully.
+ *
+ * At any given time, an UndoRequest is one of three states: FREE (not
+ * allocated to any transaction; available for reuse), UNLISTED (allocated
+ * to a transaction but not in any RBTree), or LISTED (allocated to a
+ * transaction and in either both requests_by_fxid and requests_by_size or
+ * else in requests_by_retry_time).
+ *
+ * Changes to UndoRequest objects are protected by the UndoRequestManager's
+ * lock, but not all changes require the lock.  The following rules apply:
+ *
+ * fxid must be InvalidFullTransactionId if and only if the UndoRequest is
+ * FREE, and may only be changed while holding the lock.
+ *
+ * next_free_request must be NULL unless the UndoRequest is FREE, and may
+ * only be changed while holding the lock.
+ *
+ * The remaining fields must be accurate if the UndoRequest is LISTED, but
+ * otherwise may or may not contain correct data. They should not be changed
+ * while the request is FREE, may be changed without holding the lock while
+ * the request is UNLISTED, and may only be changed while holding the lock
+ * if the requested is LISTED.
+ *
+ * Callers must be careful never to lose track of an entry that is UNLISTED;
+ * such entries will be permanently leaked. An entry that is FREE can be
+ * reallocated by this module, while one that is LISTED should eventually
+ * get processed and become FREE, but an UNLISTED entry remains the caller's
+ * responsibility until the state is changed.
+ */
+struct UndoRequest
+{
+	FullTransactionId fxid;
+	Oid			dbid;
+	Size		size;
+	UndoRecPtr	start_location_logged;
+	UndoRecPtr	end_location_logged;
+	UndoRecPtr	start_location_unlogged;
+	UndoRecPtr	end_location_unlogged;
+	TimestampTz retry_time;
+	UndoRequest *next_free_request;
+};
+
+/*
+ * An UndoRequestNode just points to an UndoRequest. We use it so that the
+ * same UndoRequest can be placed into more than one RBTree at the same
+ * time.
+ */
+typedef struct UndoRequestNode
+{
+	RBTNode		rbtnode;
+	UndoRequest *req;
+} UndoRequestNode;
+
+/*
+ * Possible sources of UndoRequest objects in need of processing.
+ */
+typedef enum UndoRequestSource
+{
+	UNDO_SOURCE_FXID,
+	UNDO_SOURCE_SIZE,
+	UNDO_SOURCE_RETRY_TIME
+} UndoRequestSource;
+
+/*
+ * An UndoRequestManager manages a collection of UndoRequest and
+ * UndoRequestNode objects. Typically, there would only be one such object
+ * for the whole system, but it's possible to create others for testing
+ * purposes.
+ */
+struct UndoRequestManager
+{
+	LWLock	   *lock;			/* for synchronization */
+	Size		capacity;		/* max # of UndoRequests */
+	Size		utilization;	/* # of non-FREE UndoRequests */
+	Size		soft_size_limit;	/* threshold to not background */
+	UndoRequestSource source;	/* which RBTree to check next? */
+	RBTree		requests_by_fxid;	/* lower FXIDs first */
+	RBTree		requests_by_size;	/* bigger sizes first */
+	RBTree		requests_by_retry_time; /* sooner retry times first */
+	bool		oldest_fxid_valid;	/* true if next field is valid */
+	FullTransactionId oldest_fxid;	/* oldest FXID of any UndoRequest */
+	UndoRequest *all_requests;
+	UndoRequest *first_free_request;
+	UndoRequestNode *first_free_request_node;
+};
+
+/* Static functions. */
+static UndoRequest *FindUndoRequestForDatabase(UndoRequestManager *urm,
+											   Oid dbid);
+static bool BackgroundUndoOK(UndoRequestManager *urm,
+							 UndoRequest *req);
+static RBTNode *UndoRequestNodeAllocate(void *arg);
+static void UndoRequestNodeFree(RBTNode *x, void *arg);
+static void UndoRequestNodeCombine(RBTNode *existing, const RBTNode *newdata,
+								   void *arg);
+static int	UndoRequestNodeCompareRetryTime(const RBTNode *a,
+											const RBTNode *b,
+											void *arg);
+static int	UndoRequestNodeCompareFXID(const RBTNode *a, const RBTNode *b,
+									   void *arg);
+static int	UndoRequestNodeCompareSize(const RBTNode *a, const RBTNode *b,
+									   void *arg);
+static void InsertUndoRequest(RBTree *rbt, UndoRequest *req);
+static void RemoveUndoRequest(RBTree *rbt, UndoRequest *req);
+static UndoRequest *FindUndoRequest(UndoRequestManager *urm,
+									FullTransactionId fxid);
+
+/*
+ * Compute the amount of space that will be needed by an undo request manager.
+ *
+ * We need space for the UndoRequestManager itself, for the UndoRequest
+ * objects, and for the UndoRequestNode objects.  We need twice as many
+ * UndoRequestNode objects as we do UndoRequest objects, because unfailed
+ * requests are stored in both requests_by_fxid and requests_by_size; failed
+ * requests are stored only in requests_by_retry_time.
+ */
+Size
+EstimateUndoRequestManagerSize(Size capacity)
+{
+	Size		s = MAXALIGN(sizeof(UndoRequestManager));
+
+	s = add_size(s, MAXALIGN(mul_size(capacity, sizeof(UndoRequest))));
+	s = add_size(s, MAXALIGN(mul_size(capacity,
+									  mul_size(2, sizeof(UndoRequestNode)))));
+
+	return s;
+}
+
+/*
+ * Initialize an undo request manager.
+ *
+ * The caller is responsible for providing an appropriately-sized chunk of
+ * memory; use EstimateUndoRequestManagerSize to find out how much space will
+ * be needed. This means that this infrastructure can potentially be used in
+ * either shared memory or, if desired, in backend-private memory. It will not
+ * work in DSM, though, because it uses pointers.
+ *
+ * The caller must also provide a lock that will be used to protect access
+ * to the data managed by this undo request manager.  This cannot be NULL,
+ * even if the memory is private.
+ */
+void
+InitializeUndoRequestManager(UndoRequestManager *urm, LWLock *lock,
+							 Size capacity, Size soft_limit)
+{
+	UndoRequest *reqs;
+	UndoRequestNode *nodes;
+	int			i;
+
+	/* Basic initialization. */
+	urm->lock = lock;
+	urm->capacity = capacity;
+	urm->utilization = 0;
+	urm->soft_size_limit = soft_limit;
+	urm->source = UNDO_SOURCE_FXID;
+	rbt_initialize(&urm->requests_by_fxid, sizeof(UndoRequestNode),
+				   UndoRequestNodeCompareFXID, UndoRequestNodeCombine,
+				   UndoRequestNodeAllocate, UndoRequestNodeFree, urm);
+	rbt_initialize(&urm->requests_by_size, sizeof(UndoRequestNode),
+				   UndoRequestNodeCompareSize, UndoRequestNodeCombine,
+				   UndoRequestNodeAllocate, UndoRequestNodeFree, urm);
+	rbt_initialize(&urm->requests_by_retry_time, sizeof(UndoRequestNode),
+				   UndoRequestNodeCompareRetryTime, UndoRequestNodeCombine,
+				   UndoRequestNodeAllocate, UndoRequestNodeFree, urm);
+	urm->oldest_fxid_valid = true;
+	urm->oldest_fxid = InvalidFullTransactionId;
+
+	/* Find memory for UndoRequest and UndoRequestNode arenas. */
+	reqs = (UndoRequest *)
+		(((char *) urm) + MAXALIGN(sizeof(UndoRequestManager)));
+	urm->all_requests = reqs;
+	nodes = (UndoRequestNode *)
+		(((char *) reqs) + MAXALIGN(capacity * sizeof(UndoRequest)));
+
+	/* Build a free list of UndoRequest objects.  */
+	urm->first_free_request = reqs;
+	for (i = 0; i < capacity - 1; ++i)
+	{
+		UndoRequest *current = &reqs[i];
+		UndoRequest *next = &reqs[i + 1];
+
+		current->next_free_request = next;
+	}
+	reqs[capacity - 1].next_free_request = NULL;
+
+	/*
+	 * Similarly, build a free list of UndoRequestNode objects.  In this case,
+	 * we use the first few bytes of the free object to store a pointer to the
+	 * next free object.
+	 */
+	StaticAssertStmt(sizeof(UndoRequestNode) >= sizeof(UndoRequestNode *),
+					 "UndoRequestNode is too small");
+	urm->first_free_request_node = nodes;
+	for (i = 0; i < 2 * capacity - 1; ++i)
+	{
+		UndoRequestNode *current = &nodes[i];
+		UndoRequestNode *next = &nodes[i + 1];
+
+		*(UndoRequestNode **) current = next;
+	}
+	*(UndoRequestNode **) &nodes[2 * capacity - 1] = NULL;
+}
+
+/*
+ * Register a new undo request. If unable, returns NULL.
+ *
+ * This function should be called before a transaction first writes any undo;
+ * at end of transaction, the caller call either UnregisterUndoRequest (on
+ * commit) or FinalizeUndoRequest (on abort).
+ *
+ * The returned request is UNLISTED (as defined above).
+ */
+UndoRequest *
+RegisterUndoRequest(UndoRequestManager *urm, FullTransactionId fxid, Oid dbid)
+{
+	UndoRequest *req;
+
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+
+	req = urm->first_free_request;
+	if (req != NULL)
+	{
+		/* Pop free list. */
+		urm->first_free_request = req->next_free_request;
+		req->next_free_request = NULL;
+
+		/* Increase utilization. */
+		++urm->utilization;
+
+		/* Initialize request object. */
+		req->fxid = fxid;
+		req->dbid = dbid;
+		req->size = 0;
+		req->start_location_logged = InvalidUndoRecPtr;
+		req->end_location_logged = InvalidUndoRecPtr;
+		req->start_location_unlogged = InvalidUndoRecPtr;
+		req->end_location_unlogged = InvalidUndoRecPtr;
+		req->retry_time = DT_NOBEGIN;
+
+		/* Save this fxid as the oldest one, if necessary. */
+		if (urm->oldest_fxid_valid &&
+			(!FullTransactionIdIsValid(urm->oldest_fxid)
+			 || FullTransactionIdPrecedes(fxid, urm->oldest_fxid)))
+			urm->oldest_fxid = fxid;
+	}
+
+	LWLockRelease(urm->lock);
+
+	return req;
+}
+
+/*
+ * Finalize details for an undo request.
+ *
+ * Since an UndoRequest should be registered before beginning to write undo,
+ * the undo size won't be known at that point; this function should be getting
+ * called at prepare time for a prepared transaction, or at abort time
+ * otherwise, by which point the size should be known.
+ *
+ * Caller should report the total size of generated undo in bytes, counting
+ * only logged and unlogged undo that will be processed by background workers.
+ * Any undo bytes that aren't part of the logged or unlogged undo records
+ * that may need cleanup actions performed should not be included in size;
+ * for example, temporary undo doesn't count, as the caller must deal with
+ * that outside of this mechanism.
+ *
+ * Caller must also pass the end location for logged and unlogged undo;
+ * each should be if InvalidUndoRecPtr if and only if the corresponding
+ * start location was never set.
+ *
+ * We don't need a lock here, because this request must be UNLISTED (as
+ * defined above).
+ */
+void
+FinalizeUndoRequest(UndoRequestManager *urm, UndoRequest *req, Size size,
+					UndoRecPtr start_location_logged,
+					UndoRecPtr start_location_unlogged,
+					UndoRecPtr end_location_logged,
+					UndoRecPtr end_location_unlogged)
+{
+	Assert(size != 0);
+	Assert(UndoRecPtrIsValid(end_location_logged) ||
+		   UndoRecPtrIsValid(end_location_unlogged));
+	Assert(UndoRecPtrIsValid(end_location_logged) ==
+		   UndoRecPtrIsValid(req->start_location_logged));
+	Assert(UndoRecPtrIsValid(end_location_unlogged) ==
+		   UndoRecPtrIsValid(req->start_location_unlogged));
+	req->size = size;
+	req->start_location_logged = start_location_logged;
+	req->start_location_unlogged = start_location_unlogged;
+	req->end_location_logged = end_location_logged;
+	req->end_location_unlogged = end_location_unlogged;
+}
+
+/*
+ * Release a previously-allocated undo request.
+ *
+ * On entry, the undo request should be either LISTED or UNLISTED; on exit,
+ * it will be FREE (as these terms are defined above).
+ *
+ * This should be used at transaction commit, if an UndoRequest was
+ * registered, or when undo for an aborted transaction has been succesfully
+ * processed.
+ *
+ * Because this function may be called as a post-commit step, it must never
+ * throw an ERROR.
+ */
+void
+UnregisterUndoRequest(UndoRequestManager *urm, UndoRequest *req)
+{
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+
+	/*
+	 * Remove the UndoRequest from any RBTree that contains it.  If the retry
+	 * time is not DT_NOBEGIN, then the request has been finalized and undo
+	 * has subsequently failed.  If the size is 0, the request has not been
+	 * finalized yet, so it's not in any RBTree.
+	 */
+	if (req->retry_time != DT_NOBEGIN)
+		RemoveUndoRequest(&urm->requests_by_retry_time, req);
+	else if (req->size != 0)
+	{
+		RemoveUndoRequest(&urm->requests_by_fxid, req);
+		RemoveUndoRequest(&urm->requests_by_size, req);
+	}
+
+	/* Plan to recompute oldest_fxid, if necessary. */
+	if (FullTransactionIdEquals(req->fxid, urm->oldest_fxid))
+		urm->oldest_fxid_valid = false;
+
+	/* Push onto freelist. */
+	req->next_free_request = urm->first_free_request;
+	urm->first_free_request = req;
+
+	/* Decrease utilization. */
+	--urm->utilization;
+
+	LWLockRelease(urm->lock);
+}
+
+/*
+ * Try to hand an undo request off for background processing.
+ *
+ * If this function returns true, the UndoRequest can be left for background
+ * processing; the caller need not do anything more. If this function returns
+ * false, the caller should try to process it in the foreground, and must
+ * call either UnregisterUndoRequest on success or RescheduleUndoRequest
+ * on failure.
+ *
+ * Because this function may be called as during transaction abort, it must
+ * never throw an ERROR. Technically, InsertUndoRequest might reach
+ * UndoRequestNodeAllocate which could ERROR if the freelist is empty, but
+ * if that happens there's a bug someplace.
+ *
+ * On entry, the UndoRequest should be UNLISTED; on exit, it is LISTED
+ * if this function returns true, and remains UNLISTED if this function
+ * returns false (see above for definitions).
+ */
+bool
+PerformUndoInBackground(UndoRequestManager *urm, UndoRequest *req)
+{
+	bool		background;
+
+	/*
+	 * If we failed after allocating an UndoRequest but before setting any
+	 * start locations, there's no work to be done.  In that case, we can just
+	 * unregister the request.
+	 */
+	if (!UndoRecPtrIsValid(req->start_location_logged) &&
+		!UndoRecPtrIsValid(req->start_location_unlogged))
+	{
+		UnregisterUndoRequest(urm, req);
+		return true;
+	}
+
+	/*
+	 * We need to check shared state in order to determine whether or not to
+	 * perform this undo in the background, and if we are going to perform it
+	 * in the background, also to add it to requests_by_fxid and
+	 * requests_by_size.
+	 */
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+	background = BackgroundUndoOK(urm, req);
+	if (background)
+	{
+		/*
+		 * We're going to handle this in the background, so add it to
+		 * requests_by_fxid and requests_by_size, so that GetNextUndoRequest
+		 * can find it.
+		 */
+		InsertUndoRequest(&urm->requests_by_fxid, req);
+		InsertUndoRequest(&urm->requests_by_size, req);
+	}
+	LWLockRelease(urm->lock);
+
+	return background;
+}
+
+/*
+ * Get an undo request that needs background processing.
+ *
+ * Unless dbid is InvalidOid, any request returned must be from the indicated
+ * database.  If minimum_runtime_reached is true, the caller only wants to
+ * process another request if the next request happens to be from the correct
+ * database. If it's false, the caller wants to avoiding exiting too quickly,
+ * and would like to process a request from the database if there's one
+ * available.
+ *
+ * If no suitable request is found, *fxid gets InvalidFullTransactionId;
+ * otherwise, *fxid gets the FullTransactionId of the transaction and
+ * the parameters which follow get the start and end locations of logged
+ * and unlogged undo for that transaction.  It's possible that the transaction
+ * wrote only logged undo or only unlogged undo, in which case the other
+ * pair fields will have a value of InvalidUndoRecPtr, but it should never
+ * happen that all of the fields get InvalidUndoRecPtr, because that would
+ * mean we queued up an UndoRequest to do nothing.
+ *
+ * This function, as a side effect, makes the returned UndoRequest UNLISTED,
+ * as defined above, so that no other backend will attempt to process it
+ * simultaneously. The caller must be certain to call either
+ * UnregisterUndoRequest (if successful) or RescheduleUndoRequest (on
+ * failure) to avoid leaking the UndoRequest.
+ */
+UndoRequest *
+GetNextUndoRequest(UndoRequestManager *urm, Oid dbid,
+				   bool minimum_runtime_reached,
+				   Oid *out_dbid,
+				   UndoRecPtr *start_location_logged,
+				   UndoRecPtr *end_location_logged,
+				   UndoRecPtr *start_location_unlogged,
+				   UndoRecPtr *end_location_unlogged)
+{
+	UndoRequest *req = NULL;
+	int			nloops;
+	bool		saw_db_mismatch = false;
+
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+
+	/* Some might have no work, so loop until all are checked. */
+	for (nloops = 0; nloops < 3; ++nloops)
+	{
+		RBTree	   *rbt;
+		UndoRequestSource source = urm->source;
+		UndoRequestNode *node;
+
+		/*
+		 * We rotate between the three possible sources of UndoRequest
+		 * objects.
+		 *
+		 * The idea here is that processing the requests with the oldest
+		 * transaction IDs is important because it helps us discard undo log
+		 * data sooner and because it allows XID horizons to advance. On the
+		 * other hand, handling transactions that generated a very large
+		 * amount of undo is also a priority, because undo will probably take
+		 * a long to finish and thus should be started as early as possible
+		 * and also because it likely touched a large number of pages which
+		 * will be slow to access until the undo is processed.
+		 *
+		 * However, we also need to make sure to periodically retry undo for
+		 * transactions that previously failed. We hope that this will be very
+		 * rare, but if it does happen we can neither affort to retry those
+		 * transactions over and over in preference to all others, nor on the
+		 * other hand to just ignore them forever.
+		 *
+		 * We could try to come up with some scoring system that assigns
+		 * relative levels of importance to FullTransactionId age, undo size,
+		 * and retry time, but it seems difficult to come up with a weighting
+		 * system that can ensure that nothing gets starved. By rotating among
+		 * the sources evenly, we know that as long as we continue to process
+		 * undo requests on some sort of regular basis, each source will get
+		 * some amount of attention.
+		 */
+		switch (source)
+		{
+			case UNDO_SOURCE_FXID:
+				rbt = &urm->requests_by_fxid;
+				urm->source = UNDO_SOURCE_SIZE;
+				break;
+			case UNDO_SOURCE_SIZE:
+				rbt = &urm->requests_by_size;
+				urm->source = UNDO_SOURCE_RETRY_TIME;
+				break;
+			case UNDO_SOURCE_RETRY_TIME:
+				rbt = &urm->requests_by_retry_time;
+				urm->source = UNDO_SOURCE_FXID;
+				break;
+		}
+
+		/* Get highest-priority item. */
+		node = (UndoRequestNode *) rbt_leftmost(rbt);
+		if (node == NULL)
+			continue;
+
+		/*
+		 * We can only take an item from the retry time RBTree if the retry
+		 * time is in the past.
+		 */
+		if (source == UNDO_SOURCE_RETRY_TIME &&
+			node->req->retry_time > GetCurrentTimestamp())
+			continue;
+
+		/*
+		 * If a database OID was specified, it must match. If it does not, we
+		 * go ahead and try any remaining RBTree.  Note that this needs to be
+		 * after the other tests so that we get the right value for the
+		 * saw_db_mismatch flag.
+		 */
+		if (OidIsValid(dbid) && node->req->dbid != dbid)
+		{
+			saw_db_mismatch = true;
+			continue;
+		}
+
+		/* Looks like we have a winner. */
+		req = node->req;
+		break;
+	}
+
+	/*
+	 * Determine whether we should do a more exhaustive search.
+	 *
+	 * If we found a node, we don't need look any harder.  If we didn't see a
+	 * database mismatch, then looking harder can't help: there's nothing to
+	 * do at all, never mind for which database.  If the caller set
+	 * minimum_runtime_reached, then they don't want us to look harder.
+	 */
+	if (req == NULL && saw_db_mismatch && !minimum_runtime_reached)
+		req = FindUndoRequestForDatabase(urm, dbid);
+
+	/*
+	 * If we found a suitable request, remove it from any RBTree that contains
+	 * it.
+	 */
+	if (req != NULL)
+	{
+		if (req->retry_time != DT_NOBEGIN)
+			RemoveUndoRequest(&urm->requests_by_retry_time, req);
+		else
+		{
+			RemoveUndoRequest(&urm->requests_by_fxid, req);
+			RemoveUndoRequest(&urm->requests_by_size, req);
+		}
+	}
+
+	LWLockRelease(urm->lock);
+
+	/*
+	 * Set output parameters.  Any request we found is now UNLISTED, so it's
+	 * safe to do this without the lock.
+	 */
+	if (req == NULL)
+		*out_dbid = InvalidOid;
+	else
+	{
+		*out_dbid = req->dbid;
+		*start_location_logged = req->start_location_logged;
+		*end_location_logged = req->end_location_logged;
+		*start_location_unlogged = req->start_location_unlogged;
+		*end_location_unlogged = req->end_location_unlogged;
+	}
+
+	/* All done. */
+	return req;
+}
+
+/*
+ * Reschedule an undo request after undo failure.
+ *
+ * This function should be called when undo processing fails, either in the
+ * foreground or in the background.  The foreground case occurs when
+ * FinalizeUndoRequest returns false and undo then also fails; the background
+ * case occurs when GetNextUndoRequest returns an UndoRequest and undo then
+ * fails. Note that this function isn't used after a shutdown or crash: see
+ * comments in RecreateUndoRequest for how we handle that case.
+ *
+ * In either of the cases where this function is reached, the UndoRequest
+ * should be UNLISTED; on return, it will be LISTED (both as defined above).
+ * If it's a foreground undo failure, it's never been LISTED; if it's a
+ * background undo failure, it was made UNLISTED by GetNextUndoRequest. So,
+ * we don't have to remove the request from anywhere, not even conditionally;
+ * we just need to add it to the set of failed requests.
+ *
+ * Because this function may be called as during transaction abort, it must
+ * never throw an ERROR. Technically, InsertUndoRequest might reach
+ * UndoRequestNodeAllocate which could ERROR if the freelist is empty, but
+ * if that happens there's a bug someplace.
+ */
+void
+RescheduleUndoRequest(UndoRequestManager *urm, UndoRequest *req)
+{
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+
+	/*
+	 * This algorithm for determining the next retry time is fairly
+	 * unsophisticated: the first retry happens after 10 seconds, and each
+	 * subsequent retry after 30 seconds. We could do something more
+	 * complicated here, but we'd need to do more bookkeeping and it's unclear
+	 * what we'd gain.
+	 */
+	if (req->retry_time == DT_NOBEGIN)
+		req->retry_time =
+			TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 10 * 1000);
+	else
+		req->retry_time =
+			TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30 * 1000);
+
+	InsertUndoRequest(&urm->requests_by_retry_time, req);
+	LWLockRelease(urm->lock);
+}
+
+/*
+ * Recreate UndoRequest state after a shutdown.
+ *
+ * This function is expected to be called after a shutdown, whether a clean
+ * shutdown or a crash, both for aborted transactions with unprocessed undo
+ * and also for prepared transactions.  All calls to this function must be
+ * completed, and SuspendPreparedUndoRequest must be called for every prepared
+ * transaction, before the first call to GetNextUndoRequest occurs.
+ *
+ * This function be called up two twice per FullTransactionId, once with
+ * is_logged true and once with is_logged false, because the transaction may
+ * have both logged and unlogged undo in different places. start_location is
+ * the beginning of the type of undo indicated by the is_logged parameter, and
+ * size is the amount of such undo in bytes.  If this function is called twice,
+ * the result will be a single UndoRequest containing both start locations and
+ * a size which is the sum of the two sizes passed to the separate calls.
+ *
+ * If this function is unable to allocate a new UndoRequest when required,
+ * it will return false.  If that happens, it's not safe to continue using
+ * this UndoRequestManager and a system-wide shutdown to raise the limit on
+ * the number of outstanding requests is indicated.
+ */
+bool
+RecreateUndoRequest(UndoRequestManager *urm, FullTransactionId fxid,
+					Oid dbid, bool is_logged, UndoRecPtr start_location,
+					UndoRecPtr end_location, Size size)
+{
+	UndoRequest *req;
+
+	Assert(UndoRecPtrIsValid(start_location));
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+	req = FindUndoRequest(urm, fxid);
+	if (req)
+	{
+		/* Already called for opposite value of is_logged. */
+		if (is_logged)
+		{
+			Assert(!UndoRecPtrIsValid(req->start_location_logged));
+			Assert(!UndoRecPtrIsValid(req->end_location_logged));
+			req->start_location_logged = start_location;
+			req->end_location_logged = end_location;
+		}
+		else
+		{
+			Assert(!UndoRecPtrIsValid(req->start_location_unlogged));
+			Assert(!UndoRecPtrIsValid(req->end_location_unlogged));
+			req->start_location_unlogged = start_location;
+			req->end_location_unlogged = end_location;
+		}
+		Assert(req->dbid == dbid);
+
+		/* Adjusting size may change position in RBTree. */
+		RemoveUndoRequest(&urm->requests_by_size, req);
+		req->size += size;
+		InsertUndoRequest(&urm->requests_by_size, req);
+	}
+	else
+	{
+		/* First call for this FullTransactionId. */
+		req = urm->first_free_request;
+		if (req == NULL)
+		{
+			LWLockRelease(urm->lock);
+			return false;
+		}
+
+		/* We got an item; pop it from the free list. */
+		urm->first_free_request = req->next_free_request;
+		req->next_free_request = NULL;
+
+		/* Increase utilization. */
+		++urm->utilization;
+
+		/* Initialize request object. */
+		req->fxid = fxid;
+		req->dbid = dbid;
+		req->size = size;
+		req->start_location_logged = InvalidUndoRecPtr;
+		req->start_location_unlogged = InvalidUndoRecPtr;
+		req->retry_time = DT_NOBEGIN;
+		if (is_logged)
+			req->start_location_logged = start_location;
+		else
+			req->start_location_unlogged = start_location;
+
+		/*
+		 * List this request so that undo workers will see it.  Note that we
+		 * assume that these are new aborts, but it's possible that there are
+		 * actually a whole series of previous undo failures before the
+		 * shutdown or crash. If we had the information about whether this
+		 * request had failed previously, we could set req->retry_time and
+		 * insert it into requests_by_retry_time rather than requests_by_fxid
+		 * and requests_by_size, but it doesn't seem important to retain
+		 * information about undo failure across crashes or shutdowns, because
+		 * we're just trying to guarantee that we don't busy-loop or starve
+		 * other requests. (FindUndoRequest would get confused, too.)
+		 */
+		InsertUndoRequest(&urm->requests_by_fxid, req);
+		InsertUndoRequest(&urm->requests_by_size, req);
+	}
+
+	LWLockRelease(urm->lock);
+	return true;
+}
+
+/*
+ * Adjust UndoRequestManager state for prepared transactions.
+ *
+ * After a restart, once all calls to RecreateUndoRequest have been completed
+ * and before the first call to GetNextUndoRequest, this function should
+ * be called for each prepared transaction. That's necessary to avoid
+ * prematurely executed undo actions for transactions that haven't aborted
+ * yet and might go on to commit. The UndoRequest for the indicated fxid is
+ * made UNLISTED (as defined above) so that GetNextUndoRequest does not find
+ * them.
+ *
+ * The caller should retain a pointer to the returned UndoRequest and, when
+ * the prepared transaction is eventually committed or rolled back, should
+ * invoke UnregisterUndoRequest on commit or FinalizeUndoRequest on abort.
+ */
+UndoRequest *
+SuspendPreparedUndoRequest(UndoRequestManager *urm, FullTransactionId fxid)
+{
+	UndoRequest *req;
+
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+	req = FindUndoRequest(urm, fxid);
+	Assert(req != NULL);
+	Assert(req->size != 0);
+	RemoveUndoRequest(&urm->requests_by_fxid, req);
+	RemoveUndoRequest(&urm->requests_by_size, req);
+	LWLockRelease(urm->lock);
+
+	return req;
+}
+
+/*
+ * Get oldest registered FXID, whether LISTED or UNLISTED (as defined above).
+ *
+ * We cache the result of this computation so as to avoid repeating it too
+ * often.
+ */
+FullTransactionId
+UndoRequestManagerOldestFXID(UndoRequestManager *urm)
+{
+	FullTransactionId result = InvalidFullTransactionId;
+
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+
+	if (urm->oldest_fxid_valid)
+		result = urm->oldest_fxid;
+	else
+	{
+		int			i;
+
+		for (i = 0; i < urm->capacity; ++i)
+		{
+			UndoRequest *req = &urm->all_requests[i];
+
+			if (FullTransactionIdIsValid(req->fxid) &&
+				(!FullTransactionIdIsValid(result) ||
+				 FullTransactionIdPrecedes(req->fxid, result)))
+				result = req->fxid;
+		}
+
+		urm->oldest_fxid = result;
+		urm->oldest_fxid_valid = true;
+	}
+
+	LWLockRelease(urm->lock);
+
+	return result;
+}
+
+/*
+ * Perform a left-to-right search of all three RBTrees, looking for a request
+ * for a given database. The searches are interleaved so that we latch
+ * onto the highest-priority request in any RBTree.
+ *
+ * It's possible that we should have some kind of limit on this search, so
+ * that it doesn't do an exhaustive search of every RBTree. However, it's not
+ * exactly clear how that would affect the behavior, or how to pick a
+ * reasonable limit.
+ */
+static UndoRequest *
+FindUndoRequestForDatabase(UndoRequestManager *urm, Oid dbid)
+{
+	RBTreeIterator iter[3];
+	int			doneflags = 0;
+	int			i = 0;
+
+	rbt_begin_iterate(&urm->requests_by_fxid, LeftRightWalk, &iter[0]);
+	rbt_begin_iterate(&urm->requests_by_size, LeftRightWalk, &iter[1]);
+	rbt_begin_iterate(&urm->requests_by_retry_time, LeftRightWalk, &iter[2]);
+
+	while (1)
+	{
+		UndoRequestNode *node;
+
+		if ((doneflags & (1 << i)) == 0)
+		{
+			node = (UndoRequestNode *) rbt_iterate(&iter[i]);
+			if (node == NULL)
+			{
+				doneflags |= 1 << i;
+				if (doneflags == 7) /* all bits set */
+					break;
+			}
+			else if (node->req->dbid == dbid)
+				return node->req;
+		}
+		i = (i + 1) % 3;
+	}
+
+	return NULL;
+}
+
+/*
+ * Is it OK to handle this UndoRequest in the background?
+ */
+static bool
+BackgroundUndoOK(UndoRequestManager *urm, UndoRequest *req)
+{
+	/*
+	 * If we've passed the soft size limit, it's not OK to background it.
+	 */
+	if (urm->utilization > urm->soft_size_limit)
+		return false;
+
+	/*
+	 * Otherwise, allow it.
+	 *
+	 * TODO: We probably want to introduce some additional rules here based on
+	 * the size of the request.
+	 */
+	return true;
+}
+
+/*
+ * RBTree callback to allocate an UndoRequestNode.
+ *
+ * Everything is preallocated, so we're just popping the freelist.
+ */
+static RBTNode *
+UndoRequestNodeAllocate(void *arg)
+{
+	UndoRequestManager *urm = arg;
+	UndoRequestNode *node = urm->first_free_request_node;
+
+	/*
+	 * Any LISTED UndoRequest should either be in both requests_by_fxid and
+	 * requests_by_size, or it should be in requests_by_retry_time, or it
+	 * should be in neither RBTree; consequently, it should be impossible to
+	 * use more than 2 UndoRequestNode objects per UndoRequest. Since we
+	 * preallocate that number, we should never run out. In case there's a bug
+	 * in the logic, let's insert a runtime check here even when Asserts are
+	 * disabled.
+	 */
+	if (node == NULL)
+		elog(ERROR, "no free UndoRequestNode");
+
+	/* Pop freelist. */
+	urm->first_free_request_node = *(UndoRequestNode **) node;
+
+	return &node->rbtnode;
+}
+
+/*
+ * RBTree callback to free an UndoRequestNode.
+ *
+ * Just put it back on the freelist.
+ */
+static void
+UndoRequestNodeFree(RBTNode *x, void *arg)
+{
+	UndoRequestManager *urm = arg;
+	UndoRequestNode *node = (UndoRequestNode *) x;
+
+	*(UndoRequestNode **) node = urm->first_free_request_node;
+	urm->first_free_request_node = node;
+}
+
+/*
+ * RBTree callback to combine an UndoRequestNode with another one.
+ *
+ * The key for every RBTree includes the FXID, which is unique, so it should
+ * never happen that we need to merge requests.
+ */
+static void
+UndoRequestNodeCombine(RBTNode *existing, const RBTNode *newdata, void *arg)
+{
+	elog(ERROR, "undo requests should never need to be combined");
+}
+
+/*
+ * RBTree comparator for requests_by_retry_time. Older retry
+ * times first; in the case of a tie, smaller FXIDs first.  This avoids ties,
+ * which is important since we don't want to merge requests, and also favors
+ * retiring older transactions first, which is generally desirable.
+ */
+static int
+UndoRequestNodeCompareRetryTime(const RBTNode *a, const RBTNode *b, void *arg)
+{
+	const UndoRequestNode *aa = (UndoRequestNode *) a;
+	const UndoRequestNode *bb = (UndoRequestNode *) b;
+	FullTransactionId fxid_a = aa->req->fxid;
+	FullTransactionId fxid_b = bb->req->fxid;
+	TimestampTz retry_time_a = aa->req->retry_time;
+	TimestampTz retry_time_b = bb->req->retry_time;
+
+	if (retry_time_a != retry_time_b)
+		return retry_time_a < retry_time_b ? -1 : 1;
+
+	if (FullTransactionIdPrecedes(fxid_a, fxid_b))
+		return -1;
+	else if (FullTransactionIdPrecedes(fxid_b, fxid_a))
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * RBTree comparator for requests_by_size. Lower FXIDs first. No tiebreak,
+ * because FXIDs should be unique.
+ */
+static int
+UndoRequestNodeCompareFXID(const RBTNode *a, const RBTNode *b, void *arg)
+{
+	const UndoRequestNode *aa = (UndoRequestNode *) a;
+	const UndoRequestNode *bb = (UndoRequestNode *) b;
+	FullTransactionId fxid_a = aa->req->fxid;
+	FullTransactionId fxid_b = bb->req->fxid;
+
+	if (FullTransactionIdPrecedes(fxid_a, fxid_b))
+		return -1;
+	else if (FullTransactionIdPrecedes(fxid_b, fxid_a))
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * RBTree comparator for requests_by_size. As in we do for the retry
+ * time RBTree, break ties in favor of lower FXIDs.
+ */
+static int
+UndoRequestNodeCompareSize(const RBTNode *a, const RBTNode *b, void *arg)
+{
+	const UndoRequestNode *aa = (UndoRequestNode *) a;
+	const UndoRequestNode *bb = (UndoRequestNode *) b;
+	FullTransactionId fxid_a = aa->req->fxid;
+	FullTransactionId fxid_b = bb->req->fxid;
+	Size		size_a = aa->req->size;
+	Size		size_b = bb->req->size;
+
+	if (size_a != size_b)
+		return size_a < size_b ? 1 : -1;
+
+	if (FullTransactionIdPrecedes(fxid_a, fxid_b))
+		return -1;
+	else if (FullTransactionIdPrecedes(fxid_b, fxid_a))
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * Insert an UndoRequest into one RBTree.
+ *
+ * The actual RBTree element is an UndoRequestNode, which just points to
+ * the actual UndoRequest.
+ */
+static void
+InsertUndoRequest(RBTree *rbt, UndoRequest *req)
+{
+	UndoRequestNode dummy;
+	bool		isNew;
+
+	/*
+	 * The rbt_insert interface is a bit strange: we have to pass something
+	 * that looks like an RBTNode, but the RBTNode itself doesn't need to be
+	 * initialized - only the "extra" data that follows the end of the
+	 * structure needs to be correct.
+	 */
+	dummy.req = req;
+	rbt_insert(rbt, &dummy.rbtnode, &isNew);
+	Assert(isNew);
+}
+
+/*
+ * Remove an UndoRequest from one RBTree.
+ *
+ * This is just the reverse of InsertUndoRequest, with the same interface
+ * quirk.
+ */
+static void
+RemoveUndoRequest(RBTree *rbt, UndoRequest *req)
+{
+	UndoRequestNode dummy;
+	RBTNode    *node;
+
+	dummy.req = req;
+	node = rbt_find(rbt, &dummy.rbtnode);
+	rbt_delete(rbt, node);
+}
+
+/*
+ * Find an UndoRequest by FXID.
+ *
+ * If we needed to do this frequently, it might be worth maintaining a hash
+ * table mapping FXID -> UndoRequest, but since we only need it after a system
+ * restart, RBTree's O(lg n) performance seems good enough.
+ *
+ * Note that this can only find an UndoRequest that has not failed and is not
+ * yet being processed, because a failed UndoRequest would be in
+ * requests_by_retry_time, not requests_by_fxid, and an in-progress
+ * UndoRequest wouldn't be in either data structure. That restriction, too,
+ * is OK for current uses.
+ */
+static UndoRequest *
+FindUndoRequest(UndoRequestManager *urm, FullTransactionId fxid)
+{
+	UndoRequest dummy_request;
+	UndoRequestNode dummy_node;
+	RBTNode    *node;
+
+	/*
+	 * Here we need both a dummy UndoRequest and a dummy UndoRequestNode; only
+	 * the comparator will look at the dummy UndoRequestNode, and it will only
+	 * look at UndoRequest, and specifically its FXID.
+	 */
+	dummy_request.fxid = fxid;
+	dummy_node.req = &dummy_request;
+	node = rbt_find(&urm->requests_by_fxid, &dummy_node.rbtnode);
+	if (node == NULL)
+		return NULL;
+	return ((UndoRequestNode *) node)->req;
+}
diff --git a/src/backend/access/undo/undostate.c b/src/backend/access/undo/undostate.c
new file mode 100644
index 0000000000..9b44677af8
--- /dev/null
+++ b/src/backend/access/undo/undostate.c
@@ -0,0 +1,526 @@
+/*-------------------------------------------------------------------------
+ *
+ * undostate.c
+ *		Undo system state management and transaction integration.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undostate.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/undorequest.h"
+#include "access/undostate.h"
+#include "access/xact.h"
+#include "miscadmin.h"
+#include "storage/shmem.h"
+
+/*
+ * The capacity of the UndoRequestManager represents the maximum number of
+ * in-progress or aborted transactions that have written undo which still needs
+ * to be tracked.  Once an aborted transaction's undo actions have been
+ * executed, it no longer counts against this limit.
+ *
+ * We could make the multiplier or the absolute value user-settable, but for
+ * now we just hard-code the capacity as a fixed multiple of MaxBackends.
+ * Hopefully, we'll never get very close to this limit, because if we do,
+ * it means that the system is aborting transactions faster than the undo
+ * machinery can perform the undo actions.
+ */
+#define UNDO_CAPACITY_PER_BACKEND		10
+
+/*
+ * If the UndoRequestManager is almost full, then we start refusing all
+ * requests to perform undo in the background. Instead, the aborting
+ * transactions will need to execute their own undo actions.  The point is
+ * to avoid hitting the hard limit, at which stage we would have to start
+ * refusing undo-writing transactions completely. This constant represents
+ * the percentage of UndoRequestManager space that may be consumed before we
+ * hit the soft limit.
+ *
+ * Note that this should be set so that the remaining capacity when the limit
+ * is hit is at least MaxBackends; if this is done, it shouldn't be possible
+ * to hit the hard limit unless the system crashes at least once while the
+ * number of tracked transactions is already above the soft limit.  We set it
+ * a bit lower than that here so as to make it unlikely that we'll hit the
+ * hard limit even if there are multiple crashes.
+ */
+#define UNDO_SOFT_LIMIT_MULTIPLIER		0.85
+
+/* Per-subtransaction backend-private undo state. */
+typedef struct UndoSubTransaction
+{
+	SubTransactionId	nestingLevel;
+	UndoRecPtr	start_location[UndoLogCategories];
+	struct UndoSubTransaction *next;
+} UndoSubTransaction;
+
+/* Backend-private undo state (but with pointers into shared memory). */
+typedef struct UndoStateData
+{
+	UndoRequestManager *manager;
+	UndoRequest *my_request;
+	bool	is_undo;
+	bool	is_background_undo;
+	bool	has_undo;
+	UndoSubTransaction *subxact;
+	UndoRecPtr last_location[UndoLogCategories];
+	Size last_size[UndoLogCategories];
+	Size total_size[UndoLogCategories];
+} UndoStateData;
+
+UndoStateData UndoState;
+UndoSubTransaction UndoTopState;
+
+static void ResetUndoState(void);
+static UndoRecPtr GetUndoRecordEndPtr(UndoRecPtr start_location, Size size);
+
+/*
+ * How much shared memory do we need for undo state management?
+ */
+Size
+UndoStateShmemSize(void)
+{
+	Size	capacity = mul_size(UNDO_CAPACITY_PER_BACKEND, MaxBackends);
+
+	return EstimateUndoRequestManagerSize(capacity);
+}
+
+/*
+ * Initialize UndoRequestManager if required.
+ *
+ * Otherwise, just stash a pointer to it.
+ */
+void
+UndoStateShmemInit(void)
+{
+	Size	capacity = UNDO_CAPACITY_PER_BACKEND * MaxBackends;
+	Size	soft_limit = capacity * UNDO_SOFT_LIMIT_MULTIPLIER;
+	Size	size = EstimateUndoRequestManagerSize(capacity);
+	bool	found;
+
+	UndoState.manager = (UndoRequestManager *)
+		ShmemInitStruct("undo request manager", size, &found);
+	if (!found)
+		InitializeUndoRequestManager(UndoState.manager, UndoRequestLock,
+									 capacity, soft_limit);
+	Assert(UndoState.my_request == NULL);
+	ResetUndoState();
+}
+
+/*
+ * Accumulate information about one undo record insertion within the current
+ * transaction.
+ *
+ * This must be called for before every undo record insertion. We will need these
+ * details to decide what to do if the transaction aborts. It's important that this
+ * is called before the undo is actually inserted, because if we need to register
+ * an UndoRequest and fail to do so, the failure needs to occur while we still have
+ * no undo that will potentially require background processing.
+ */
+void
+UndoStateAccumulateRecord(UndoLogCategory category, UndoRecPtr start_location,
+						  Size size)
+{
+	int		nestingLevel = GetCurrentTransactionNestLevel();
+	UndoRecPtr *sub_start_location;
+
+	/* Remember that we've done something undo-related. */
+	UndoState.has_undo = true;
+
+	/* We should be connected to a database. */
+	Assert(OidIsValid(MyDatabaseId));
+
+	/* Register new UndoRequest if required for this persistence level. */
+	if (UndoState.my_request == NULL &&
+		(category == UNDO_PERMANENT || category == UNDO_UNLOGGED))
+		UndoState.my_request = RegisterUndoRequest(UndoState.manager,
+												   GetTopFullTransactionId(),
+												   MyDatabaseId);
+
+	/*
+	 * If we've entered a subtransaction, spin up a new UndoSubTransaction so that
+	 * we can track the start locations for the subtransaction separately from any
+	 * parent (sub)transactions.
+	 */
+	if (nestingLevel > UndoState.subxact->nestingLevel)
+	{
+		UndoSubTransaction *subxact;
+		int i;
+
+		subxact = MemoryContextAlloc(TopMemoryContext, sizeof(UndoSubTransaction));
+		subxact->nestingLevel = nestingLevel;
+		subxact->next = UndoState.subxact;
+
+		for (i = 0; i < UndoLogCategories; ++i)
+			subxact->start_location[i] = InvalidUndoRecPtr;
+	}
+
+	/*
+	 * If this is the first undo for this persistence level in this subtransaction,
+	 * record the start location.
+	 */
+	sub_start_location = &UndoState.subxact->start_location[category];
+	if (!UndoRecPtrIsValid(*sub_start_location))
+		*sub_start_location = start_location;
+
+	/*
+	 * Remember this as the last start location and record size for the persistence
+	 * level.
+	 */
+	UndoState.last_location[category] = start_location;
+	UndoState.last_size[category] = size;
+
+	/* Add to total size for persistence level. */
+	UndoState.total_size[category] += size;
+}
+
+/*
+ * Attempt to obtain an UndoRequest for background processing.
+ *
+ * If there is no work to be done right now, this function will return InvalidOid.
+ * Otherwise, the return value is the OID of the database to which the caller must
+ * be connected to perform the necessary undo work.
+ *
+ * When this function returns a database OID, any subsequent transaction abort will
+ * reschedule the UndoRequest for later reprocessing, and it's no longer this
+ * backend's responsibility. However, a transaction commit does not automatically
+ * unregister the request as successfully completed; to do that, call
+ * FinishBackgroundUndo.
+ *
+ * The minimum_runtime_reached parameter is passed to GetNextUndoRequest, q.v.
+ */
+Oid
+InitializeBackgroundUndoState(bool minimum_runtime_reached)
+{
+	Oid			dbid;
+	UndoRecPtr	start_location_logged;
+	UndoRecPtr	start_location_unlogged;
+	UndoRecPtr	end_location_logged;
+	UndoRecPtr	end_location_unlogged;
+
+	Assert(!UndoState.has_undo && !UndoState.is_undo);
+	Assert(UndoState.my_request == NULL);
+
+	UndoState.my_request =
+		GetNextUndoRequest(UndoState.manager, MyDatabaseId, minimum_runtime_reached,
+						   &dbid, &start_location_logged, &start_location_unlogged,
+						   &end_location_logged, &end_location_unlogged);
+	if (UndoState.my_request == NULL)
+		return InvalidOid;
+
+	UndoState.has_undo = true;
+	UndoState.is_undo = true;
+	UndoState.is_background_undo = true;
+	UndoState.subxact->start_location[UNDO_PERMANENT] = start_location_logged;
+	UndoState.subxact->start_location[UNDO_UNLOGGED] = start_location_unlogged;
+
+	/*
+	 * The "last location" and "last size" data we set up here isn't really accurate;
+	 * our goal is just to get the correct end location through to the code that
+	 * actually processes undo.
+	 */
+	UndoState.last_location[UNDO_PERMANENT] = end_location_logged;
+	UndoState.last_location[UNDO_UNLOGGED] = end_location_unlogged;
+	UndoState.last_size[UNDO_PERMANENT] = 0;
+	UndoState.last_size[UNDO_UNLOGGED] = 0;
+
+	Assert(OidIsValid(dbid));
+	return dbid;
+}
+
+/*
+ * If background undo processing succeeds, call this function.
+ *
+ * It will unregister the undo request.
+ */
+void
+FinishBackgroundUndo(void)
+{
+	Assert(UndoState.is_background_undo);
+	Assert(UndoState.my_request != NULL);
+
+	UnregisterUndoRequest(UndoState.manager, UndoState.my_request);
+	ResetUndoState();
+}
+
+/*
+ * Perform undo actions.
+ *
+ * This function might be called either to process undo actions in the background
+ * or to perform foreground undo.  Caller must ensure that we have a valid
+ * transaction context so that it's safe for us to do things that might fail.
+ *
+ * Our job is to apply all undo for transaction nesting levels greater than or
+ * equal to the level supplied as an argument.
+ */
+void
+PerformUndoActions(int nestingLevel)
+{
+	/*
+	 * XXX. NOT IMPLEMENTED.
+	 *
+	 * Invoke facilities to actually apply undo actions from here, passing the
+	 * relevant information from the UndoState so that they know what to do.
+	 *
+	 * In the case of subtransaction undo, this also needs to tear down the
+	 * relevant UndoSubTransaction (or else we need a separate entrypoint for
+	 * that). For a top-level transaction, AtCommit_UndoState() or
+	 * FinishBackgroundUndo() will take care of it.
+	 */
+}
+
+/*
+ * Post-commit cleanup of the undo state.
+ *
+ * NB: This code MUST NOT FAIL, since it is run as a post-commit cleanup step.
+ * Don't put anything complicated in this function!
+ */
+void
+AtCommit_UndoState(void)
+{
+	/*
+	 * For background undo processing, the fact that the transaction is committing
+	 * doesn't necessarily mean we're done.  For example, we might have just been
+	 * connecting to the database or something of that sort. Client code must call
+	 * FinishBackgroundUndo() to report successful completion. So, do nothing in
+	 * that case.
+	 */
+	if (UndoState.is_background_undo)
+		return;
+
+	/* Shouldn't commit after beginning foreground undo. */
+	Assert(!UndoState.is_undo);
+
+	/* Also exit quickly if we never did anything undo-related. */
+	if (!UndoState.has_undo)
+		return;
+
+	/*
+	 * Since our (foreground) transaction committed, we know that no undo actions
+	 * for any undo we wrote will need to be performed, and can therefore unregister
+	 * our UndoRequest, if any.
+	 */
+	if (UndoState.my_request != NULL)
+	{
+		UnregisterUndoRequest(UndoState.manager, UndoState.my_request);
+		UndoState.my_request = NULL;
+	}
+
+	/* Reset state for next transaction. */
+	ResetUndoState();
+}
+
+/*
+ * Post-abort cleanup of the undo state.
+ *
+ * Our main goals here are to (1) tell the caller whether foreground undo is
+ * required and (2) avoid losing track of any UndoRequest that we own.
+ */
+void
+AtAbort_UndoState(bool *perform_foreground_undo)
+{
+	bool	has_temporary_undo = false;
+	Size	request_size;
+	UndoRecPtr end_location_logged;
+	UndoRecPtr end_location_unlogged;
+
+	*perform_foreground_undo = false;
+
+	/* Exit quickly if this transaction generated no undo. */
+	if (!UndoState.has_undo)
+		return;
+
+	/* This is a toplevel abort, so collapse all subtransaction state. */
+	while (UndoState.subxact->next != NULL)
+	{
+		UndoSubTransaction *cursubxact = UndoState.subxact;
+		UndoSubTransaction *nextsubxact = cursubxact->next;
+		int	i;
+
+		for (i = 0; i < UndoLogCategories; ++i)
+			if (!UndoRecPtrIsValid(nextsubxact->start_location[i]))
+				nextsubxact->start_location[i] = cursubxact->start_location[i];
+		pfree(cursubxact);
+		UndoState.subxact = nextsubxact;
+	}
+
+	/* Figure out whether there any temporary undo remaining to be processed. */
+	has_temporary_undo =
+		UndoRecPtrIsValid(UndoState.subxact->start_location[UNDO_TEMP]);
+
+	if (UndoState.is_undo)
+	{
+		/*
+		 * Regrettably, we seem to have failed when attempting to perform undo
+		 * actions. First, try to reschedule any undo request for later background
+		 * processing, so that we don't lose track of it.
+		 */
+		if (UndoState.my_request != NULL)
+			RescheduleUndoRequest(UndoState.manager, UndoState.my_request);
+
+		/*
+		 * XXX. If we have any temporary undo, we're in big trouble, because there's
+		 * no way for background workers to process it, and apparently we're also
+		 * unable to process it.  Should we throw FATAL?  Just leave the undo
+		 * unapplied and somehow retry at a later point in the session?
+		 */
+		if (has_temporary_undo)
+			/* experience_intense_sadness */;
+
+		return;
+	}
+
+	/*
+	 * If we have no UndoRequest, then the caller must perform foreground undo if
+	 * we have any temporary undo.
+	 */
+	if (UndoState.my_request == NULL)
+	{
+		if (has_temporary_undo)
+			*perform_foreground_undo = true;
+		else
+			ResetUndoState();
+		return;
+	}
+
+
+	/*
+	 * Update UndoRequest details.
+	 *
+	 * NB: Background processing facilities don't care about our temporary undo.
+	 */
+	request_size = UndoState.total_size[UNDO_PERMANENT] +
+		UndoState.total_size[UNDO_UNLOGGED];
+	end_location_logged =
+		GetUndoRecordEndPtr(UndoState.last_location[UNDO_PERMANENT],
+							UndoState.last_size[UNDO_PERMANENT]);
+	end_location_unlogged =
+		GetUndoRecordEndPtr(UndoState.last_location[UNDO_UNLOGGED],
+							UndoState.last_size[UNDO_UNLOGGED]);
+	FinalizeUndoRequest(UndoState.manager, UndoState.my_request, request_size,
+						UndoState.subxact->start_location[UNDO_PERMANENT],
+						UndoState.subxact->start_location[UNDO_UNLOGGED],
+						end_location_logged,
+						end_location_unlogged);
+
+	/*
+	 * We have generated undo for permanent and/or unlogged tables.  Is it OK for
+	 * that work to get handled in the background?
+	 */
+	if (PerformUndoInBackground(UndoState.manager, UndoState.my_request))
+	{
+		if (!has_temporary_undo)
+		{
+			/* No temporary undo, and everything else in the background. */
+			ResetUndoState();
+			return;
+		}
+
+		/*
+		 * Permanent and unloged undo in the background, but temporary undo is
+		 * still our problem.
+		 */
+		UndoState.my_request = NULL;
+		UndoState.subxact->start_location[UNDO_PERMANENT] = InvalidUndoRecPtr;
+		UndoState.subxact->start_location[UNDO_UNLOGGED] = InvalidUndoRecPtr;
+		UndoState.last_location[UNDO_PERMANENT] = InvalidUndoRecPtr;
+		UndoState.last_location[UNDO_UNLOGGED] = InvalidUndoRecPtr;
+		UndoState.last_size[UNDO_PERMANENT] = 0;
+		UndoState.last_size[UNDO_UNLOGGED] = 0;
+		UndoState.total_size[UNDO_PERMANENT] = 0;
+		UndoState.total_size[UNDO_UNLOGGED] = 0;
+	}
+
+	/* Caller needs to initiate foreground undo. */
+	*perform_foreground_undo = true;
+}
+
+/*
+ * Post-subtransaction commit cleanup of the undo state.
+ *
+ * Like AtCommit_UndoState, this must not fail.
+ */
+void
+AtSubCommit_UndoState(int level)
+{
+	UndoSubTransaction *cursubxact = UndoState.subxact;
+	UndoSubTransaction *nextsubxact = cursubxact->next;
+	int		i;
+
+	/* Exit quickly if the transaction or this subtransaction has no undo. */
+	if (!UndoState.has_undo || cursubxact->nestingLevel < level)
+		return;
+
+	/* If this fails, some other subtransaction failed to clean up properly. */
+	Assert(cursubxact->nestingLevel == level);
+
+	/* If this fails, things are really messed up. */
+	Assert(nextsubxact->nestingLevel < cursubxact->nestingLevel);
+
+	/*
+	 * If we have undo but our parent subtransaction doesn't, we can just adjust
+	 * the nesting level of the current UndoSubTransaction.
+	 */
+	if (nextsubxact->nestingLevel < cursubxact->nestingLevel - 1)
+	{
+		cursubxact->nestingLevel--;
+		return;
+	}
+
+	/* Merge our data with parent. */
+	for (i = 0; i < UndoLogCategories; ++i)
+		if (!UndoRecPtrIsValid(nextsubxact->start_location[i]))
+			nextsubxact->start_location[i] = cursubxact->start_location[i];
+	pfree(cursubxact);
+	UndoState.subxact = nextsubxact;
+}
+
+void
+AtSubAbort_UndoState(int level, bool *perform_foreground_undo)
+{
+	/* XXX DO STUFF */
+}
+
+/*
+ * Reset backend-local undo state.
+ */
+static void
+ResetUndoState(void)
+{
+	int		i;
+
+	UndoState.my_request = NULL;
+	UndoState.is_undo = false;
+	UndoState.is_background_undo = false;
+	UndoState.has_undo = false;
+	UndoState.subxact = &UndoTopState;
+	UndoTopState.nestingLevel = 1;
+	UndoTopState.next = NULL;
+
+	for (i = 0; i < UndoLogCategories; ++i)
+	{
+		UndoTopState.start_location[i] = InvalidUndoRecPtr;
+		UndoState.last_location[i] = InvalidUndoRecPtr;
+		UndoState.last_size[i] = 0;
+		UndoState.total_size[i] = 0;
+	}
+}
+
+/*
+ * Add the size of an undo record to the location where it starts to find the end
+ * location.
+ */
+static UndoRecPtr
+GetUndoRecordEndPtr(UndoRecPtr start_location, Size size)
+{
+	UndoLogNumber	logno = UndoRecPtrGetLogNo(start_location);
+	UndoLogOffset	offset = UndoRecPtrGetOffset(start_location);
+
+	offset = UndoLogOffsetPlusUsableBytes(offset, size);
+	return MakeUndoRecPtr(logno, offset);
+}
diff --git a/src/backend/lib/rbtree.c b/src/backend/lib/rbtree.c
index 33181e9211..bda870eab7 100644
--- a/src/backend/lib/rbtree.c
+++ b/src/backend/lib/rbtree.c
@@ -35,25 +35,6 @@
 #define RBTBLACK	(0)
 #define RBTRED		(1)
 
-/*
- * RBTree control structure
- */
-struct RBTree
-{
-	RBTNode    *root;			/* root node, or RBTNIL if tree is empty */
-
-	/* Remaining fields are constant after rbt_create */
-
-	Size		node_size;		/* actual size of tree nodes */
-	/* The caller-supplied manipulation functions */
-	rbt_comparator comparator;
-	rbt_combiner combiner;
-	rbt_allocfunc allocfunc;
-	rbt_freefunc freefunc;
-	/* Passthrough arg passed to all manipulation functions */
-	void	   *arg;
-};
-
 /*
  * all leafs are sentinels, use customized NIL name to prevent
  * collision with system-wide constant NIL which is actually NULL
@@ -122,6 +103,33 @@ rbt_create(Size node_size,
 	return tree;
 }
 
+/*
+ * rbt_initialize: initalize an empty RBTree
+ *
+ * This is just like rbt_create, except that the caller is responsible for
+ * allocating the memory.
+ */
+void
+rbt_initialize(RBTree *rbt,
+			   Size node_size,
+			   rbt_comparator comparator,
+			   rbt_combiner combiner,
+			   rbt_allocfunc allocfunc,
+			   rbt_freefunc freefunc,
+			   void *arg)
+{
+	Assert(node_size > sizeof(RBTNode));
+
+	rbt->root = RBTNIL;
+	rbt->node_size = node_size;
+	rbt->comparator = comparator;
+	rbt->combiner = combiner;
+	rbt->allocfunc = allocfunc;
+	rbt->freefunc = freefunc;
+
+	rbt->arg = arg;
+}
+
 /* Copy the additional data fields from one RBTNode to another */
 static inline void
 rbt_copy_data(RBTree *rbt, RBTNode *dest, const RBTNode *src)
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 12c324925c..0754269982 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/twophase.h"
 #include "access/undolog.h"
+#include "access/undostate.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -149,6 +150,7 @@ CreateSharedMemoryAndSemaphores(int port)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, UndoStateShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -266,6 +268,7 @@ CreateSharedMemoryAndSemaphores(int port)
 	BTreeShmemInit();
 	SyncScanShmemInit();
 	AsyncShmemInit();
+	UndoStateShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/lmgr/lwlocknames.txt b/src/backend/storage/lmgr/lwlocknames.txt
index 4b42a1cf0b..aee8d3eba0 100644
--- a/src/backend/storage/lmgr/lwlocknames.txt
+++ b/src/backend/storage/lmgr/lwlocknames.txt
@@ -50,3 +50,4 @@ OldSnapshotTimeMapLock				42
 LogicalRepWorkerLock				43
 CLogTruncationLock					44
 UndoLogLock                                      45
+UndoRequestLock						46
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 33fd052156..cc00509699 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -47,6 +47,7 @@
 #define EpochFromFullTransactionId(x)	((uint32) ((x).value >> 32))
 #define XidFromFullTransactionId(x)		((uint32) (x).value)
 #define U64FromFullTransactionId(x)		((x).value)
+#define FullTransactionIdEquals(a, b)	((a).value == (b).value)
 #define FullTransactionIdPrecedes(a, b)	((a).value < (b).value)
 #define FullTransactionIdIsValid(x)		TransactionIdIsValid(XidFromFullTransactionId(x))
 #define InvalidFullTransactionId		FullTransactionIdFromEpochAndXid(0, InvalidTransactionId)
diff --git a/src/include/access/undorequest.h b/src/include/access/undorequest.h
new file mode 100644
index 0000000000..4ab28f6772
--- /dev/null
+++ b/src/include/access/undorequest.h
@@ -0,0 +1,76 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorequest.h
+ *		Undo request manager.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undorequest.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOREQUEST_H
+#define UNDOREQUEST_H
+
+#include "access/transam.h"
+#include "access/undolog.h"
+#include "datatype/timestamp.h"
+
+struct UndoRequest;
+struct UndoRequestManager;
+typedef struct UndoRequest UndoRequest;
+typedef struct UndoRequestManager UndoRequestManager;
+
+/* Initialization functions. */
+extern Size EstimateUndoRequestManagerSize(Size capacity);
+extern void InitializeUndoRequestManager(UndoRequestManager *urm,
+										 LWLock *lock, Size capacity,
+										 Size soft_limit);
+
+/* Call this before inserting undo records. */
+extern UndoRequest *RegisterUndoRequest(UndoRequestManager *urm,
+										FullTransactionId fxid,
+										Oid dbid);
+
+/* Remember undo size and end locations. */
+extern void FinalizeUndoRequest(UndoRequestManager *urm,
+								UndoRequest *req,
+								Size size,
+								UndoRecPtr start_location_logged,
+								UndoRecPtr start_location_unlogged,
+								UndoRecPtr end_location_logged,
+								UndoRecPtr end_location_unlogged);
+
+/* Forget about an UndoRequest we don't need any more. */
+extern void UnregisterUndoRequest(UndoRequestManager *urm, UndoRequest *req);
+
+/* Attempt to dispatch UndoRequest for background processing. */
+extern bool PerformUndoInBackground(UndoRequestManager *urm, UndoRequest *req);
+
+/* Get work for background undo process. */
+extern UndoRequest *GetNextUndoRequest(UndoRequestManager *urm, Oid dbid,
+									   bool minimum_runtime_reached,
+									   Oid *out_dbid,
+									   UndoRecPtr *start_location_logged,
+									   UndoRecPtr *end_location_logged,
+									   UndoRecPtr *start_location_unlogged,
+									   UndoRecPtr *end_location_unlogged);
+
+/* Reschedule failed undo attempt. */
+extern void RescheduleUndoRequest(UndoRequestManager *urm, UndoRequest *req);
+
+/* Restore state after crash. */
+extern bool RecreateUndoRequest(UndoRequestManager *urm,
+								FullTransactionId fxid, Oid dbid,
+								bool is_logged,
+								UndoRecPtr start_location,
+								UndoRecPtr end_location,
+								Size size);
+extern UndoRequest *SuspendPreparedUndoRequest(UndoRequestManager *urm,
+											   FullTransactionId fxid);
+
+/* Get oldest registered FXID. */
+extern FullTransactionId UndoRequestManagerOldestFXID(UndoRequestManager *urm);
+
+#endif
diff --git a/src/include/access/undostate.h b/src/include/access/undostate.h
new file mode 100644
index 0000000000..44118c24dc
--- /dev/null
+++ b/src/include/access/undostate.h
@@ -0,0 +1,37 @@
+/*-------------------------------------------------------------------------
+ *
+ * undostate.h
+ *		Undo system state management and transaction integration.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undostate.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOSTATE_H
+#define UNDOSTATE_H
+
+#include "access/undolog.h"
+
+extern Size UndoStateShmemSize(void);
+extern void UndoStateShmemInit(void);
+
+extern void UndoStateAccumulateRecord(UndoLogCategory category,
+									  UndoRecPtr start_location,
+									  Size size);
+
+extern Oid InitializeBackgroundUndoState(bool minimum_runtime_reached);
+extern void FinishBackgroundUndo(void);
+
+extern void PerformUndoActions(int nestingLevel);
+
+extern void AtCommit_UndoState(void);
+extern void AtAbort_UndoState(bool *perform_foreground_undo);
+extern void AtSubCommit_UndoState(int level);
+extern void AtSubAbort_UndoState(int level, bool *perform_foreground_undo);
+
+/* XXX what about prepare? */
+
+#endif
diff --git a/src/include/lib/rbtree.h b/src/include/lib/rbtree.h
index 6d79a24015..ff6f99a932 100644
--- a/src/include/lib/rbtree.h
+++ b/src/include/lib/rbtree.h
@@ -28,8 +28,33 @@ typedef struct RBTNode
 	struct RBTNode *parent;		/* parent, or NULL (not RBTNIL!) if none */
 } RBTNode;
 
-/* Opaque struct representing a whole tree */
-typedef struct RBTree RBTree;
+/* Support functions to be provided by caller */
+typedef int (*rbt_comparator) (const RBTNode *a, const RBTNode *b, void *arg);
+typedef void (*rbt_combiner) (RBTNode *existing, const RBTNode *newdata, void *arg);
+typedef RBTNode *(*rbt_allocfunc) (void *arg);
+typedef void (*rbt_freefunc) (RBTNode *x, void *arg);
+
+/*
+ * RBTree control structure
+ *
+ * This is declared here to make it possible to preallocate an object of
+ * the correct size, but callers should not access the members diretly.
+ */
+typedef struct RBTree
+{
+	RBTNode    *root;			/* root node, or RBTNIL if tree is empty */
+
+	/* Remaining fields are constant after rbt_create */
+
+	Size		node_size;		/* actual size of tree nodes */
+	/* The caller-supplied manipulation functions */
+	rbt_comparator comparator;
+	rbt_combiner combiner;
+	rbt_allocfunc allocfunc;
+	rbt_freefunc freefunc;
+	/* Passthrough arg passed to all manipulation functions */
+	void	   *arg;
+} RBTree;
 
 /* Available tree iteration orderings */
 typedef enum RBTOrderControl
@@ -53,18 +78,19 @@ struct RBTreeIterator
 	bool		is_over;
 };
 
-/* Support functions to be provided by caller */
-typedef int (*rbt_comparator) (const RBTNode *a, const RBTNode *b, void *arg);
-typedef void (*rbt_combiner) (RBTNode *existing, const RBTNode *newdata, void *arg);
-typedef RBTNode *(*rbt_allocfunc) (void *arg);
-typedef void (*rbt_freefunc) (RBTNode *x, void *arg);
-
 extern RBTree *rbt_create(Size node_size,
 						  rbt_comparator comparator,
 						  rbt_combiner combiner,
 						  rbt_allocfunc allocfunc,
 						  rbt_freefunc freefunc,
 						  void *arg);
+extern void rbt_initialize(RBTree *rbt,
+						   Size node_size,
+						   rbt_comparator comparator,
+						   rbt_combiner combiner,
+						   rbt_allocfunc allocfunc,
+						   rbt_freefunc freefunc,
+						   void *arg);
 
 extern RBTNode *rbt_find(RBTree *rbt, const RBTNode *data);
 extern RBTNode *rbt_leftmost(RBTree *rbt);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 60d6d7be1b..f32afffab1 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -19,6 +19,7 @@ SUBDIRS = \
 		  test_rbtree \
 		  test_rls_hooks \
 		  test_shm_mq \
+		  test_undo_request_manager \
 		  unsafe_tests \
 		  worker_spi
 
diff --git a/src/test/modules/test_undo_request_manager/.gitignore b/src/test/modules/test_undo_request_manager/.gitignore
new file mode 100644
index 0000000000..5dcb3ff972
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/.gitignore
@@ -0,0 +1,4 @@
+# Generated subdirectories
+/log/
+/results/
+/tmp_check/
diff --git a/src/test/modules/test_undo_request_manager/Makefile b/src/test/modules/test_undo_request_manager/Makefile
new file mode 100644
index 0000000000..5bc4695004
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/Makefile
@@ -0,0 +1,21 @@
+# src/test/modules/test_undo_request_manager/Makefile
+
+MODULE_big = test_undo_request_manager
+OBJS = test_undo_request_manager.o $(WIN32RES)
+PGFILEDESC = "test_undo_request_manager - test undo request manager code"
+
+EXTENSION = test_undo_request_manager
+DATA = test_undo_request_manager--1.0.sql
+
+REGRESS = test_undo_request_manager
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_undo_request_manager
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_undo_request_manager/expected/test_undo_request_manager.out b/src/test/modules/test_undo_request_manager/expected/test_undo_request_manager.out
new file mode 100644
index 0000000000..c79611b3b6
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/expected/test_undo_request_manager.out
@@ -0,0 +1,28 @@
+CREATE EXTENSION test_undo_request_manager;
+-- not enough space
+select urm_simple_test(1, '{10000,20000}');
+ERROR:  unable to register undo request #2
+-- simple case
+select urm_simple_test(2, '{10000,20000}');
+ urm_simple_test 
+-----------------
+ {1001,1002}
+(1 row)
+
+-- should alternate between early and large requests in order
+select urm_simple_test(10,
+'{10000,20000,30000,40000,50000,1000000,1000000,1000000,1000000}');
+                urm_simple_test                 
+------------------------------------------------
+ {1001,1006,1002,1007,1003,1008,1004,1009,1005}
+(1 row)
+
+-- should alternate between early and large requests, but the large requests
+-- should be processed in reverse order
+select urm_simple_test(10,
+'{10000,20000,30000,40000,50000,1000000,2000000,3000000,4000000,50000000}');
+                   urm_simple_test                   
+-----------------------------------------------------
+ {1001,1010,1002,1009,1003,1008,1004,1007,1005,1006}
+(1 row)
+
diff --git a/src/test/modules/test_undo_request_manager/sql/test_undo_request_manager.sql b/src/test/modules/test_undo_request_manager/sql/test_undo_request_manager.sql
new file mode 100644
index 0000000000..6611e040b6
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/sql/test_undo_request_manager.sql
@@ -0,0 +1,16 @@
+CREATE EXTENSION test_undo_request_manager;
+
+-- not enough space
+select urm_simple_test(1, '{10000,20000}');
+
+-- simple case
+select urm_simple_test(2, '{10000,20000}');
+
+-- should alternate between early and large requests in order
+select urm_simple_test(10,
+'{10000,20000,30000,40000,50000,1000000,1000000,1000000,1000000}');
+
+-- should alternate between early and large requests, but the large requests
+-- should be processed in reverse order
+select urm_simple_test(10,
+'{10000,20000,30000,40000,50000,1000000,2000000,3000000,4000000,50000000}');
diff --git a/src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql b/src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql
new file mode 100644
index 0000000000..30ff471c23
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql
@@ -0,0 +1,9 @@
+/* src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_undo_request_manager" to load this file. \quit
+
+CREATE FUNCTION urm_simple_test(capacity pg_catalog.int4,
+								requests pg_catalog.int8[])
+    RETURNS pg_catalog.int8[] STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_undo_request_manager/test_undo_request_manager.c b/src/test/modules/test_undo_request_manager/test_undo_request_manager.c
new file mode 100644
index 0000000000..8b994283c8
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/test_undo_request_manager.c
@@ -0,0 +1,139 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_undo_request_manager.c
+ *		Test undo request manager.
+ *
+ * Copyright (c) 2013-2019, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/test/modules/test_undo_request_manager/undo_request_manager.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/undorequest.h"
+#include "catalog/pg_type_d.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "utils/array.h"
+
+PG_MODULE_MAGIC;
+PG_FUNCTION_INFO_V1(urm_simple_test);
+
+/*
+ * SQL-callable test function.  We create an UndoRequestManager in
+ * backend-private memory here and exercise it a bit to see if it breaks.
+ *
+ * The first argument is the capacity of the UndoRequestManager as an integer.
+ *
+ * The second argument is 1-dimensional bigint array, where each subarray
+ * contains a hypothetical undo size.
+ *
+ * This function registers and inserts all the requests (failing if space is
+ * exhausted) with fake, sequentially assigned transaction IDs, and then
+ * fetches them back one by one. The return value is an array of fake
+ * transaction IDs in the order they were returned.
+ *
+ * This test doesn't simulate undo failure, multi-database operation, or
+ * prepared transactions.
+ */
+Datum
+urm_simple_test(PG_FUNCTION_ARGS)
+{
+	int64	capacity = PG_GETARG_INT32(0);
+	ArrayType *array = PG_GETARG_ARRAYTYPE_P(1);
+	Datum	  *darray;
+	int			nentries;
+	Datum	  *dresult;
+	ArrayType *result;
+	UndoRequestManager *urm;
+	const UndoRecPtr SomeValidUndoRecPtr = InvalidUndoRecPtr + 1;
+	int			i;
+	FullTransactionId fake_fxid = FullTransactionIdFromEpochAndXid(0, 1000);
+
+	/* Require positive capacity. */
+	if (capacity <= 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("undo request manager capacity must be a positive integer")));
+
+	/* Sanity-check and deconstruct array. */
+	if (ARR_NDIM(array) != 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_ARRAY_ELEMENT_ERROR),
+				 errmsg("array must have exactly 1 dimension")));
+	if (array_contains_nulls(array))
+		ereport(ERROR,
+				(errcode(ERRCODE_ARRAY_ELEMENT_ERROR),
+				 errmsg("cannot work with arrays containing NULLs")));
+	deconstruct_array(array, INT8OID, 8, FLOAT8PASSBYVAL, 'd',
+					  &darray, NULL, &nentries);
+
+	/*
+	 * Initialize UndoRequestManager. We have to supply an LWLock; rather than
+	 * creating a new one somewhere, just use our own backendLock. These locks
+	 * aren't that heavily trafficked and we won't have any reason to take it
+	 * for any other purpose while the UndoRequstManager holds it, so this
+	 * should be safe enough.
+	 *
+	 * We make the soft limit equal to the full capacity here for testing
+	 * purposes, which means that we should always succeed in dispatching to
+	 * the background.
+	 */
+	urm = palloc(EstimateUndoRequestManagerSize(capacity));
+	InitializeUndoRequestManager(urm, &MyProc->backendLock,
+								 capacity, capacity);
+
+	/* Insert entries as provided by caller. */
+	for (i = 0; i < nentries; ++i)
+	{
+		int64	size = DatumGetInt64(darray[i]);
+		UndoRequest *req;
+
+		FullTransactionIdAdvance(&fake_fxid);
+
+		req = RegisterUndoRequest(urm, fake_fxid, MyDatabaseId);
+		if (req == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("unable to register undo request #%d", i + 1)));
+		UndoRequestSetStartLocation(urm, req, true, SomeValidUndoRecPtr);
+		FinalizeUndoRequest(urm, req, size,
+							SomeValidUndoRecPtr,
+							InvalidUndoRecPtr);
+		if (!PerformUndoInBackground(urm, req))
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("unable to background undo request #%d", i + 1)));
+	}
+
+	/* Now get the entries back. */
+	dresult = palloc(nentries * sizeof(Datum));
+	for (i = 0; true; ++i)
+	{
+		UndoRequest *req;
+		UndoRecPtr	p[4];
+
+		/* Get some work. */
+		req = GetNextUndoRequest(urm, MyDatabaseId, true,
+								 &fake_fxid, &p[0], &p[1], &p[2], &p[3]);
+		if (req == NULL)
+			break;
+		if (i >= nentries)
+			elog(ERROR, "found more undo requests than were inserted");
+
+		/* Save the fake FXID. */
+		dresult[i] =
+			Int64GetDatum((int64) U64FromFullTransactionId(fake_fxid));
+
+		/* Report that we successfully processed the imaginary undo. */
+		UnregisterUndoRequest(urm, req);
+	}
+
+	/* Put result into array form. */
+	result = construct_array(dresult, i, INT8OID, 8, FLOAT8PASSBYVAL, 'd');
+	PG_RETURN_ARRAYTYPE_P(result);
+}
diff --git a/src/test/modules/test_undo_request_manager/test_undo_request_manager.control b/src/test/modules/test_undo_request_manager/test_undo_request_manager.control
new file mode 100644
index 0000000000..0a340e9843
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/test_undo_request_manager.control
@@ -0,0 +1,4 @@
+comment = 'Test code for undo request manager'
+default_version = '1.0'
+module_pathname = '$libdir/test_undo_request_manager'
+relocatable = true
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 432d2d812e..e0049245a9 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2536,6 +2536,11 @@ ULONG
 ULONG_PTR
 UV
 UVersionInfo
+UndoRecPtr
+UndoRequest
+UndoRequestManager
+UndoRequestNode
+UndoRequestSource
 Unique
 UniquePath
 UniquePathMethod
-- 
2.17.2 (Apple Git-113)

