From 3bab0e97a0328a6ab764e817f532b21e31f5fd60 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Thu, 8 Aug 2019 16:05:53 -0400
Subject: [PATCH] Draft of new undo request manager.

---
 src/backend/access/undo/Makefile              |    2 +-
 src/backend/access/undo/undorequest.c         | 1030 +++++++++++++++++
 src/backend/lib/rbtree.c                      |   46 +-
 src/include/access/undorequest.h              |   77 ++
 src/include/lib/rbtree.h                      |   42 +-
 src/test/modules/Makefile                     |    1 +
 .../test_undo_request_manager/Makefile        |   21 +
 .../expected/test_undo_request_manager.out    |   28 +
 .../sql/test_undo_request_manager.sql         |   16 +
 .../test_undo_request_manager--1.0.sql        |    9 +
 .../test_undo_request_manager.c               |  139 +++
 .../test_undo_request_manager.control         |    4 +
 12 files changed, 1387 insertions(+), 28 deletions(-)
 create mode 100644 src/backend/access/undo/undorequest.c
 create mode 100644 src/include/access/undorequest.h
 create mode 100644 src/test/modules/test_undo_request_manager/Makefile
 create mode 100644 src/test/modules/test_undo_request_manager/expected/test_undo_request_manager.out
 create mode 100644 src/test/modules/test_undo_request_manager/sql/test_undo_request_manager.sql
 create mode 100644 src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql
 create mode 100644 src/test/modules/test_undo_request_manager/test_undo_request_manager.c
 create mode 100644 src/test/modules/test_undo_request_manager/test_undo_request_manager.control

diff --git a/src/backend/access/undo/Makefile b/src/backend/access/undo/Makefile
index 219c6963cf..d036717671 100644
--- a/src/backend/access/undo/Makefile
+++ b/src/backend/access/undo/Makefile
@@ -12,6 +12,6 @@ subdir = src/backend/access/undo
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = undolog.o
+OBJS = undolog.o undorequest.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/undo/undorequest.c b/src/backend/access/undo/undorequest.c
new file mode 100644
index 0000000000..cc6bb5decc
--- /dev/null
+++ b/src/backend/access/undo/undorequest.c
@@ -0,0 +1,1030 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorequest.c
+ *		Undo request manager.
+ *
+ * From the moment a transaction begins until the moment that it commits,
+ * there is a possibility that it might abort, either due to an exception
+ * or because the entire system is restarted (e.g. because of a power
+ * cut). If this happens, all undo generated by that transaction prior
+ * to the abort must be applied. It's possible that many transactions
+ * might abort all around the same time; if so, it may be necessary to
+ * choose which transactions to apply first. This code is in charge of
+ * deciding which transactions should be undone first.
+ *
+ * We ensure that every transaction which aborts gets undone by
+ * registering an "undo request" for it.  The undo request should be
+ * registered before the transaction writes any undo records (except for
+ * temporary undo records, which the creating backend will need to
+ * process locally). If the transaction goes on to commit, the undo
+ * request can be deleted; if it goes on to abort, it needs to be updated
+ * with the final size of the undo generated by that transaction so that
+ * we can prioritize it appropriately. After a system restart, undo
+ * requests for any transactions that were pending undo prior to the
+ * restart need to be reregistered, and any transactions aborted by the
+ * crash need to be registered as well, but this code is not responsible
+ * for those tasks; it just manages the requests that are handed to it.
+ *
+ * We have only a fixed amount of shared memory to store undo requests;
+ * because an undo request has to be created before any undo that might
+ * need to be processed is written, we should never end up in a situation
+ * where there are more existing undo requests that can fit. In extreme
+ * cases, this might cause us to have to refuse to create new requests,
+ * but that should very rare.  If we're starting to run low on space,
+ * FinalizeUndoRequest() will signal callers that undo should be
+ * performed in the foreground; actually hitting the hard limit requires
+ * foreground undo to be interrupted by a crash.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/access/undo/undorequest.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/undorequest.h"
+#include "lib/rbtree.h"
+#include "storage/shmem.h"
+#include "utils/timestamp.h"
+
+/*
+ * An UndoRequest represents the possible need to perform undo actions for
+ * a transaction if it aborts; thus, it should be allocated before writing
+ * undo that might require the system to perform cleanup actions (except
+ * temporary undo, for which the backend is always responsible) and
+ * deallocated when it is clear that no such actions will need to be
+ * performed or when they have all been performed successfully.
+ */
+struct UndoRequest
+{
+	FullTransactionId	fxid;
+	Oid			dbid;
+	Size		size;
+	UndoRecPtr	start_location_logged;
+	UndoRecPtr	end_location_logged;
+	UndoRecPtr	start_location_unlogged;
+	UndoRecPtr	end_location_unlogged;
+	TimestampTz	retry_time;
+};
+
+/*
+ * An UndoRequestNode just points to an UndoRequest. We use it so that the
+ * same UndoRequest can be placed into more than one RBTree at the same
+ * time.
+ */
+typedef struct UndoRequestNode
+{
+	RBTNode		rbtnode;
+	UndoRequest *req;
+} UndoRequestNode;
+
+/*
+ * Possible sources of UndoRequest objects in need of processing.
+ */
+typedef enum UndoRequestSource
+{
+	UNDO_SOURCE_FXID,
+	UNDO_SOURCE_SIZE,
+	UNDO_SOURCE_RETRY_TIME
+} UndoRequestSource;
+
+/*
+ * An UndoRequestManager manages a collection of UndoRequest and
+ * UndoRequestNode objects. Typically, there would only be one such object
+ * for the whole system, but it's possible to create others for testing
+ * purposes.
+ */
+struct UndoRequestManager
+{
+	LWLock	   *lock;
+	Size		capacity;
+	Size		utilization;
+	Size		soft_size_limit;
+	UndoRequestSource	source;
+	RBTree		requests_by_fxid;
+	RBTree		requests_by_size;
+	RBTree		requests_by_retry_time;
+	UndoRequest *first_free_request;
+	UndoRequestNode *first_free_request_node;
+};
+
+/* Static functions. */
+static UndoRequest *FindUndoRequestForDatabase(UndoRequestManager *urm,
+											   Oid dbid);
+static bool BackgroundUndoOK(UndoRequestManager *urm,
+							 UndoRequest *req);
+static RBTNode *UndoRequestNodeAllocate(void *arg);
+static void UndoRequestNodeFree(RBTNode *x, void *arg);
+static void UndoRequestNodeCombine(RBTNode *existing, const RBTNode *newdata,
+								   void *arg);
+static int UndoRequestNodeCompareRetryTime(const RBTNode *a,
+										   const RBTNode *b,
+										   void *arg);
+static int UndoRequestNodeCompareFXID(const RBTNode *a, const RBTNode *b,
+									  void *arg);
+static int UndoRequestNodeCompareSize(const RBTNode *a, const RBTNode *b,
+									  void *arg);
+static void InsertUndoRequest(RBTree *rbt, UndoRequest *req);
+static void RemoveUndoRequest(RBTree *rbt, UndoRequest *req);
+static UndoRequest *FindUndoRequest(UndoRequestManager *urm,
+									FullTransactionId fxid);
+
+/*
+ * Compute the amount of space that will be needed by an undo request manager.
+ *
+ * We need space for the UndoRequestManager itself, for the UndoRequests, and
+ * for the UndoRequestNode objects.  We need twice as many UndoRequestNode
+ * objects as we do UndoRequest objects, because unfailed requests are stored
+ * in both requests_by_fxid and requests_by_size; failed requests are stored
+ * only in requests_by_retry_time.
+ */
+Size
+EstimateUndoRequestManagerSize(Size capacity)
+{
+	Size	s = MAXALIGN(sizeof(UndoRequestManager));
+
+	s = add_size(s, MAXALIGN(mul_size(capacity, sizeof(UndoRequest))));
+	s = add_size(s, MAXALIGN(mul_size(capacity,
+									  mul_size(2, sizeof(UndoRequestNode)))));
+
+	return s;
+}
+
+/*
+ * Initialize an undo request manager.
+ *
+ * The caller is responsible for providing an appropriately-sized chunk of
+ * memory; use EstimateUndoRequestManagerSize to find out how much space will
+ * be needed. This means that this infrastructure can potentially be used in
+ * either shared memory or, if desired, in backend-private memory. It will not
+ * work in DSM, though, because it uses pointers.
+ *
+ * The caller must also provide a lock that will be used to protect access
+ * to the data managed by this undo request manager.  This cannot be NULL,
+ * even if the memory is private.
+ */
+void
+InitializeUndoRequestManager(UndoRequestManager *urm, LWLock *lock,
+							 Size capacity, Size soft_limit)
+{
+	UndoRequest *reqs;
+	UndoRequestNode *nodes;
+	int		i;
+
+	/* Basic initialization. */
+	urm->lock = lock;
+	urm->capacity = capacity;
+	urm->utilization = 0;
+	urm->soft_size_limit = soft_limit;
+	urm->source = UNDO_SOURCE_FXID;
+	rbt_initialize(&urm->requests_by_fxid, sizeof(UndoRequestNode),
+				   UndoRequestNodeCompareFXID, UndoRequestNodeCombine,
+				   UndoRequestNodeAllocate, UndoRequestNodeFree, urm);
+	rbt_initialize(&urm->requests_by_size, sizeof(UndoRequestNode),
+				   UndoRequestNodeCompareSize, UndoRequestNodeCombine,
+				   UndoRequestNodeAllocate, UndoRequestNodeFree, urm);
+	rbt_initialize(&urm->requests_by_retry_time, sizeof(UndoRequestNode),
+				   UndoRequestNodeCompareRetryTime, UndoRequestNodeCombine,
+				   UndoRequestNodeAllocate, UndoRequestNodeFree, urm);
+
+	/* Find memory for UndoRequest and UndoRequestNode arenas. */
+	reqs = (UndoRequest *)
+		(((char *) urm) + MAXALIGN(sizeof(UndoRequestManager)));
+	nodes = (UndoRequestNode *)
+		(((char *) reqs) + MAXALIGN(capacity * sizeof(UndoRequest)));
+
+	/*
+	 * Build a free list of UndoRequest objects. We use the first few bytes
+	 * of the free object to store a pointer to the next free object.
+	 */
+	StaticAssertStmt(sizeof(UndoRequest) >= sizeof(UndoRequest *),
+					 "UndoRequest is too small");
+	urm->first_free_request = reqs;
+	for (i = 0; i < capacity - 1; ++i)
+	{
+		UndoRequest *current = &reqs[i];
+		UndoRequest *next = &reqs[i + 1];
+
+		*(UndoRequest **) current = next;
+	}
+	*(UndoRequest **) &reqs[capacity - 1] = NULL;
+
+	/* Similarly, build a free list of UndoRequestNode objects. */
+	StaticAssertStmt(sizeof(UndoRequestNode) >= sizeof(UndoRequestNode *),
+					 "UndoRequestNode is too small");
+	urm->first_free_request_node = nodes;
+	for (i = 0; i < 2 * capacity - 1; ++i)
+	{
+		UndoRequestNode *current = &nodes[i];
+		UndoRequestNode *next = &nodes[i + 1];
+
+		*(UndoRequestNode **) current = next;
+	}
+	*(UndoRequestNode **) &nodes[2 * capacity - 1] = NULL;
+}
+
+/*
+ * Register a new undo request. If unable, returns NULL.
+ *
+ * Note that we don't add the undo request to any RBTree at this point
+ * and that there is no mechanism for garbage collecting an entry that is
+ * leaked! The caller must be sure to call either UnregisterUndoRequest
+ * (if the transaction commits) or FinalizeUndoRequest (if it aborts).
+ */
+UndoRequest *
+RegisterUndoRequest(UndoRequestManager *urm, FullTransactionId fxid, Oid dbid)
+{
+	UndoRequest *req;
+
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+
+	req = urm->first_free_request;
+	if (req != NULL)
+	{
+		/* Pop free list. */
+		urm->first_free_request = *(UndoRequest **) req;
+
+		/* Increase utilization. */
+		++urm->utilization;
+
+		/* Initialize request object. */
+		req->fxid = fxid;
+		req->dbid = dbid;
+		req->size = 0;
+		req->start_location_logged = InvalidUndoRecPtr;
+		req->end_location_logged = InvalidUndoRecPtr;
+		req->start_location_unlogged = InvalidUndoRecPtr;
+		req->end_location_unlogged = InvalidUndoRecPtr;
+		req->retry_time = DT_NOBEGIN;
+	}
+
+	LWLockRelease(urm->lock);
+
+	return req;
+}
+
+/*
+ * Set the start location for either logged or unlogged undo.
+ *
+ * We don't need a lock here, because it's only legal to call this after
+ * calling RegisterUndoRequest and before calling UnregisterUndoRequest
+ * or FinalizeUndoRequest. During that window, no other process can be
+ * accessing the UndoRequest we allocated.
+ */
+void
+UndoRequestSetStartLocation(UndoRequestManager *urm, UndoRequest *req,
+							bool is_logged, UndoRecPtr start_location)
+{
+	Assert(UndoRecPtrIsValid(start_location));
+	if (is_logged)
+		req->start_location_logged = start_location;
+	else
+		req->start_location_unlogged = start_location;
+}
+
+/*
+ * Finalize details for an undo request.
+ *
+ * We don't need a lock here for the same reasons as in UndoRequestSetLocation.
+ *
+ * Since an UndoRequest should be registered before beginning to write undo,
+ * the undo size won't be known at that point; this function should be getting
+ * called at prepare time for a prepared transaction, or at abort time
+ * otherwise, by which point the size should be known.
+ *
+ * Caller should report the total size of generated undo in bytes, counting
+ * only logged and unlogged undo that will be processed by background workers.
+ * Any undo bytes that aren't part of the logged or unlogged undo records
+ * that may need cleanup actions performed should not be included in size;
+ * for example, temporary undo doesn't count, as the caller must deal with
+ * that outside of this mechanism.
+ *
+ * Caller must also pass the end location for logged and unlogged undo;
+ * each should be if InvalidUndoRecPtr if and only if the corresponding
+ * start location was never set.
+ */
+void
+FinalizeUndoRequest(UndoRequestManager *urm, UndoRequest *req, Size size,
+					UndoRecPtr end_location_logged,
+					UndoRecPtr end_location_unlogged)
+{
+	Assert(size != 0);
+	Assert(UndoRecPtrIsValid(end_location_logged) ||
+		   UndoRecPtrIsValid(end_location_unlogged));
+	Assert(UndoRecPtrIsValid(end_location_logged) ==
+		   UndoRecPtrIsValid(req->start_location_logged));
+	Assert(UndoRecPtrIsValid(end_location_unlogged) ==
+		   UndoRecPtrIsValid(req->start_location_unlogged));
+	req->size = size;
+	req->end_location_logged = end_location_logged;
+	req->end_location_unlogged = end_location_unlogged;
+}
+
+/*
+ * Release a previously-allocated undo request.
+ *
+ * This can be used either on an UndoRequest returned by RegisterUndoRequest
+ * (in the case where the transaction commits, or where it aborts without
+ * actually generating any undo) or on one returned by GetNextUndoRequest
+ * (after any undo actions have been succesfully executed).
+ *
+ * Because this function may be called as a post-commit step, it must never
+ * throw an ERROR.
+ */
+void
+UnregisterUndoRequest(UndoRequestManager *urm, UndoRequest *req)
+{
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+
+	/*
+	 * Remove the UndoRequest from any RBTree that contains it.  If the
+	 * retry time is not DT_NOBEGIN, then the request has been finalized
+	 * and undo has subsequently failed.  If the size is 0, the request has
+	 * not been finalized yet, so it's not in any RBTree.
+	 */
+	if (req->retry_time != DT_NOBEGIN)
+		RemoveUndoRequest(&urm->requests_by_retry_time, req);
+	else if (req->size != 0)
+	{
+		RemoveUndoRequest(&urm->requests_by_fxid, req);
+		RemoveUndoRequest(&urm->requests_by_size, req);
+	}
+
+	/* Push onto freelist. */
+	*(UndoRequest **) req = urm->first_free_request;
+	urm->first_free_request = req;
+
+	/* Decrease utilization. */
+	--urm->utilization;
+
+	LWLockRelease(urm->lock);
+}
+
+/*
+ * Try to hand an undo request off for background processing.
+ *
+ * If this function returns true, the UndoRequest can be left for background
+ * processing; the caller need not do anything more. If this function returns
+ * false, the caller should try to process it in the foreground, and must
+ * call either UnregisterUndoRequest on success or RescheduleUndoRequest
+ * on failure.
+ *
+ * Because this function may be called as during transaction abort, it must
+ * never throw an ERROR. Technically, InsertUndoRequest might reach
+ * UndoRequestNodeAllocate which could ERROR if the freelist is empty, but
+ * if that happens there's a bug someplace.
+ */
+bool
+PerformUndoInBackground(UndoRequestManager *urm, UndoRequest *req)
+{
+	bool	background;
+
+	/*
+	 * If we failed after allocating an UndoRequest but before setting any
+	 * start locations, there's no work to be done.  In that case, we can
+	 * just unregister the request. Note that no concurrent access to our
+	 * UndoRequest is possible at this stage, since it is not on the freelist
+	 * or accessible via any RBTree.
+	 */
+	if (!UndoRecPtrIsValid(req->start_location_logged) &&
+		!UndoRecPtrIsValid(req->start_location_unlogged))
+	{
+		UnregisterUndoRequest(urm, req);
+		return true;
+	}
+
+	/*
+	 * We need to check shared state in order to determine whether or not
+	 * to perform this undo in the background, and if we are going to perform
+	 * it in the background, also to add it to requests_by_fxid and
+	 * requests_by_size.
+	 */
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+	background = BackgroundUndoOK(urm, req);
+	if (background)
+	{
+		/*
+		 * We're going to handle this in the background, so add it to
+		 * requests_by_fxid and requests_by_size, so that GetNextUndoRequest
+		 * can find it.
+		 */
+		InsertUndoRequest(&urm->requests_by_fxid, req);
+		InsertUndoRequest(&urm->requests_by_size, req);
+	}
+	LWLockRelease(urm->lock);
+
+	return background;
+}
+
+/*
+ * Get an undo request that needs background processing.
+ *
+ * Unless dbid is InvalidOid, any request returned must be from the indicated
+ * database.  If minimum_runtime_reached is true, the caller only wants to
+ * process another request if the next request happens to be from the correct
+ * database. If it's false, the caller wants to avoiding exiting too quickly,
+ * and would like to process a request from the database if there's one
+ * available.
+ *
+ * If no suitable request is found, *fxid gets InvalidFullTransactionId;
+ * otherwise, *fxid gets the FullTransactionId of the transaction and
+ * the parameters which follow get the start and end locations of logged
+ * and unlogged undo for that transaction.  It's possible that the transaction
+ * wrote only logged undo or only unlogged undo, in which case the other
+ * pair fields will have a value of InvalidUndoRecPtr, but it should never
+ * happen that all of the fields get InvalidUndoRecPtr, because that would
+ * mean we queued up an UndoRequest to do nothing.
+ *
+ * This function, as a side effect, removes the returned UndoRequest from
+ * any RBTree that contains it, so that no other backend will attempt to
+ * process it simultaneously. The caller must be certain to call either
+ * UnregisterUndoRequest (if successful) or RescheduleUndoRequest (on
+ * failure) to avoid leaking the UndoRequest.
+ */
+UndoRequest *
+GetNextUndoRequest(UndoRequestManager *urm, Oid dbid,
+				   bool minimum_runtime_reached,
+				   FullTransactionId *fxid,
+				   UndoRecPtr *start_location_logged,
+				   UndoRecPtr *end_location_logged,
+				   UndoRecPtr *start_location_unlogged,
+				   UndoRecPtr *end_location_unlogged)
+{
+	UndoRequest *req = NULL;
+	int 	nloops;
+	bool	saw_db_mismatch = false;
+
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+
+	/* Some might have no work, so loop until all are checked. */
+	for (nloops = 0; nloops < 3; ++nloops)
+	{
+		RBTree *rbt;
+		UndoRequestSource source = urm->source;
+		UndoRequestNode *node;
+
+		/*
+		 * We rotate between the three possible sources of UndoRequest objects.
+		 *
+		 * The idea here is that processing the requests with the oldest
+		 * transaction IDs is important because it helps us discard undo log
+		 * data sooner and because it allows XID horizons to advance. On
+		 * the other hand, handling transactions that generated a very large
+		 * amount of undo is also a priority, because undo will probably
+		 * take a long to finish and thus should be started as early as
+		 * possible and also because it likely touched a large number of
+		 * pages which will be slow to access until the undo is processed.
+		 *
+		 * However, we also need to make sure to periodically retry undo for
+		 * transactions that previously failed. We hope that this will be very
+		 * rare, but if it does happen we can neither affort to retry those
+		 * transactions over and over in preference to all others, nor on the
+		 * other hand to just ignore them forever.
+		 *
+		 * We could try to come up with some scoring system that assigns
+		 * relative levels of importance to FullTransactionId age, undo size,
+		 * and retry time, but it seems difficult to come up with a weighting
+		 * system that can ensure that nothing gets starved. By rotating among
+		 * the sources evenly, we know that as long as we continue to process
+		 * undo requests on some sort of regular basis, each source will get
+		 * some amount of attention.
+		 */
+		switch (source)
+		{
+			case UNDO_SOURCE_FXID:
+				rbt = &urm->requests_by_fxid;
+				urm->source = UNDO_SOURCE_SIZE;
+				break;
+			case UNDO_SOURCE_SIZE:
+				rbt = &urm->requests_by_size;
+				urm->source = UNDO_SOURCE_RETRY_TIME;
+				break;
+			case UNDO_SOURCE_RETRY_TIME:
+				rbt = &urm->requests_by_retry_time;
+				urm->source = UNDO_SOURCE_FXID;
+				break;
+		}
+
+		/* Get highest-priority item. */
+		node = (UndoRequestNode *) rbt_leftmost(rbt);
+		if (node == NULL)
+			continue;
+
+		/*
+		 * We can only take an item from the retry time RBTree if the retry
+		 * time is in the past.
+		 */
+		if (source == UNDO_SOURCE_RETRY_TIME &&
+			node->req->retry_time > GetCurrentTimestamp())
+			continue;
+
+		/*
+		 * If a database OID was specified, it must match. If it does not,
+		 * we go ahead and try any remaining RBTree.  Note that this needs to
+		 * be after the other tests so that we get the right value for the
+		 * saw_db_mismatch flag.
+		 */
+		if (OidIsValid(dbid) && node->req->dbid != dbid)
+		{
+			saw_db_mismatch = true;
+			continue;
+		}
+
+		/* Looks like we have a winner. */
+		req = node->req;
+		break;
+	}
+
+	/*
+	 * Determine whether we should do a more exhaustive search.
+	 *
+	 * If we found a node, we don't need look any harder.  If we didn't
+	 * see a database mismatch, then looking harder can't help: there's nothing
+	 * to do at all, never mind for which database.  If the caller set
+	 * minimum_runtime_reached, then they don't want us to look harder.
+	 */
+	if (req == NULL && saw_db_mismatch && !minimum_runtime_reached)
+		req = FindUndoRequestForDatabase(urm, dbid);
+
+	/*
+	 * If we found a suitable request, remove it from any RBTree that
+	 * contains it.
+	 */
+	if (req != NULL)
+	{
+		if (req->retry_time != DT_NOBEGIN)
+			RemoveUndoRequest(&urm->requests_by_retry_time, req);
+		else
+		{
+			RemoveUndoRequest(&urm->requests_by_fxid, req);
+			RemoveUndoRequest(&urm->requests_by_size, req);
+		}
+	}
+
+	LWLockRelease(urm->lock);
+
+	/*
+	 * Set output parameters.  We've already removed the UndoRequest from any
+	 * RBTree that contained it, so it's safe to do this without the lock.
+	 */
+	if (req == NULL)
+		*fxid = InvalidFullTransactionId;
+	else
+	{
+		*fxid = req->fxid;
+		*start_location_logged = req->start_location_logged;
+		*end_location_logged = req->end_location_logged;
+		*start_location_unlogged = req->start_location_unlogged;
+		*end_location_unlogged = req->end_location_unlogged;
+	}
+
+	/* All done. */
+	return req;
+}
+
+/*
+ * Reschedule an undo request after undo failure.
+ *
+ * This function should be called when undo processing fails, either in the
+ * foreground or in the background.  The foreground case occurs when
+ * FinalizeUndoRequest returns false and undo then also fails; the background
+ * case occurs when GetNextUndoRequest returns an UndoRequest and undo then
+ * fails. Note that this function isn't used after a shutdown or crash: see
+ * comments in RecreateUndoRequest for how we handle that case.
+ *
+ * In either of the cases where this function is reached, the UndoRequest
+ * should not be in any RBTree. If it's a foreground undo failure, it's never
+ * been added; if it's a background undo failure, it was removed by
+ * GetNextUndoRequest. So, we don't have to remove the request from anywhere,
+ * not even conditionally; we just need to add it to the set of failed
+ * requests.
+ *
+ * Because this function may be called as during transaction abort, it must
+ * never throw an ERROR. Technically, InsertUndoRequest might reach
+ * UndoRequestNodeAllocate which could ERROR if the freelist is empty, but
+ * if that happens there's a bug someplace.
+ */
+void
+RescheduleUndoRequest(UndoRequestManager *urm, UndoRequest *req)
+{
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+
+	/*
+	 * This algorithm for determining the next retry time is fairly
+	 * unsophisticated: the first retry happens after 10 seconds, and each
+	 * subsequent retry after 30 seconds. We could do something more
+	 * complicated here, but we'd need to do more bookkeeping and it's
+	 * unclear what we'd gain.
+	 */
+	if (req->retry_time == DT_NOBEGIN)
+		req->retry_time =
+			TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 10 * 1000);
+	else
+		req->retry_time =
+			TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 30 * 1000);
+
+	InsertUndoRequest(&urm->requests_by_retry_time, req);
+	LWLockRelease(urm->lock);
+}
+
+/*
+ * Recreate UndoRequest state after a shutdown.
+ *
+ * This function is expected to be called after a shutdown, whether a clean
+ * shutdown or a crash, both for aborted transactions with unprocessed undo
+ * and also for prepared transactions.  All calls to this function must be
+ * completed, and SuspendPreparedUndoRequest must be called for every prepared
+ * transaction, before the first call to GetNextUndoRequest occurs.
+ *
+ * This function be called up two twice per FullTransactionId, once with
+ * is_logged true and once with is_logged false, because the transaction may
+ * have both logged and unlogged undo in different places. start_location is
+ * the beginning of the type of undo indicated by the is_logged parameter, and
+ * size is the amount of such undo in bytes.  If this function is called twice,
+ * the result will be a single UndoRequest containing both start locations and
+ * a size which is the sum of the two sizes passed to the separate calls.
+ *
+ * If this function is unable to allocate a new UndoRequest when required,
+ * it will return false.  If that happens, it's not safe to continue using
+ * this UndoRequestManager and a system-wide shutdown to raise the limit on
+ * the number of outstanding requests is indicated.
+ */
+bool
+RecreateUndoRequest(UndoRequestManager *urm, FullTransactionId fxid,
+					Oid dbid, bool is_logged, UndoRecPtr start_location,
+					UndoRecPtr end_location, Size size)
+{
+	UndoRequest *req;
+
+	Assert(UndoRecPtrIsValid(start_location));
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+	req = FindUndoRequest(urm, fxid);
+	if (req)
+	{
+		/* Already called for opposite value of is_logged. */
+		if (is_logged)
+		{
+			Assert(!UndoRecPtrIsValid(req->start_location_logged));
+			Assert(!UndoRecPtrIsValid(req->end_location_logged));
+			req->start_location_logged = start_location;
+			req->end_location_logged = end_location;
+		}
+		else
+		{
+			Assert(!UndoRecPtrIsValid(req->start_location_unlogged));
+			Assert(!UndoRecPtrIsValid(req->end_location_unlogged));
+			req->start_location_unlogged = start_location;
+			req->end_location_unlogged = end_location;
+		}
+		Assert(req->dbid == dbid);
+
+		/* Adjusting size may change position in RBTree. */
+		RemoveUndoRequest(&urm->requests_by_size, req);
+		req->size += size;
+		InsertUndoRequest(&urm->requests_by_size, req);
+	}
+	else
+	{
+		/* First call for this FullTransactionId. */
+		req = urm->first_free_request;
+		if (req == NULL)
+		{
+			LWLockRelease(urm->lock);
+			return false;
+		}
+
+		/* We got an item; pop it from the free list. */
+		urm->first_free_request = *(UndoRequest **) req;
+
+		/* Increase utilization. */
+		++urm->utilization;
+
+		/* Initialize request object. */
+		req->fxid = fxid;
+		req->dbid = dbid;
+		req->size = size;
+		req->start_location_logged = InvalidUndoRecPtr;
+		req->start_location_unlogged = InvalidUndoRecPtr;
+		req->retry_time = DT_NOBEGIN;
+		if (is_logged)
+			req->start_location_logged = start_location;
+		else
+			req->start_location_unlogged = start_location;
+
+		/*
+		 * Put it where undo workers will see it.  Note that we assume that
+		 * these are new aborts, but it's possible that there are actually
+		 * a whole series of previous undo failures before the shutdown or
+		 * crash. If we had the information about whether this request had
+		 * failed previously, we could set req->retry_time and insert it into
+		 * requests_by_retry_time rather than requests_by_fxid and
+		 * requests_by_size, but it doesn't seem important to retain
+		 * information about undo failure across crashes or shutdowns, because
+		 * we're just trying to guarantee that we don't busy-loop or starve
+		 * other requests. (FindUndoRequest would get confused, too.)
+		 */
+		InsertUndoRequest(&urm->requests_by_fxid, req);
+		InsertUndoRequest(&urm->requests_by_size, req);
+	}
+
+	LWLockRelease(urm->lock);
+	return true;
+}
+
+/*
+ * Adjust UndoRequestManager state for prepared transactions.
+ *
+ * After a restart, once all calls to RecreateUndoRequest have been completed
+ * and before the first call to GetNextUndoRequest, this function should
+ * be called for each prepared transaction. That's necessary to avoid
+ * prematurely executed undo actions for transactions that haven't aborted
+ * yet and might go on to commit. It removes the entries for the transaction
+ * from each RBTree so that GetNextUndoRequest does not find them.
+ *
+ * The caller should retain a pointer to the returned UndoRequest and, when
+ * the prepared transaction is eventually committed or rolled back, should
+ * invoke UnregisterUndoRequest on commit or FinalizeUndoRequest on abort.
+ */
+UndoRequest *
+SuspendPreparedUndoRequest(UndoRequestManager *urm, FullTransactionId fxid)
+{
+	UndoRequest *req;
+
+	LWLockAcquire(urm->lock, LW_EXCLUSIVE);
+	req = FindUndoRequest(urm, fxid);
+	Assert(req != NULL);
+	Assert(req->size != 0);
+	RemoveUndoRequest(&urm->requests_by_fxid, req);
+	RemoveUndoRequest(&urm->requests_by_size, req);
+	LWLockRelease(urm->lock);
+
+	return req;
+}
+
+/*
+ * Perform a left-to-right search of all three RBTrees, looking for a request
+ * for a given database. The searches are interleaved so that we latch
+ * onto the highest-priority request in any RBTree.
+ *
+ * It's possible that we should have some kind of limit on this search, so
+ * that it doesn't do an exhaustive search of every RBTree. However, it's not
+ * exactly clear how that would affect the behavior, or how to pick a
+ * reasonable limit.
+ */
+static UndoRequest *
+FindUndoRequestForDatabase(UndoRequestManager *urm, Oid dbid)
+{
+	RBTreeIterator	iter[3];
+	int		doneflags = 0;
+	int		i = 0;
+
+	rbt_begin_iterate(&urm->requests_by_fxid, LeftRightWalk, &iter[0]);
+	rbt_begin_iterate(&urm->requests_by_size, LeftRightWalk, &iter[1]);
+	rbt_begin_iterate(&urm->requests_by_retry_time, LeftRightWalk, &iter[2]);
+
+	while (1)
+	{
+		UndoRequestNode *node;
+
+		if ((doneflags & (1 << i)) == 0)
+		{
+			node = (UndoRequestNode *) rbt_iterate(&iter[i]);
+			if (node == NULL)
+			{
+				doneflags |= 1 << i;
+				if (doneflags == 7)		/* all bits set */
+					break;
+			}
+			else if (node->req->dbid == dbid)
+				return node->req;
+		}
+		i = (i + 1) % 3;
+	}
+
+	return NULL;
+}
+
+/*
+ * Is it OK to handle this UndoRequest in the background?
+ */
+static bool
+BackgroundUndoOK(UndoRequestManager *urm, UndoRequest *req)
+{
+	/*
+	 * If we've passed the soft size limit, it's not OK to background it.
+	 */
+	if (urm->utilization > urm->soft_size_limit)
+		return false;
+
+	/*
+	 * Otherwise, allow it.
+	 *
+	 * TODO: We probably want to introduce some additional rules here based
+	 * on the size of the request.
+	 */
+	return true;
+}
+
+/*
+ * RBTree callback to allocate an UndoRequestNode.
+ *
+ * Everything is preallocated, so we're just popping the freelist.
+ */
+static RBTNode *
+UndoRequestNodeAllocate(void *arg)
+{
+	UndoRequestManager *urm = arg;
+	UndoRequestNode *node = urm->first_free_request_node;
+
+	/*
+	 * Any given UndoRequest should either be in both requests_by_fxid
+	 * and requests_by_size, or it should be in requests_by_retry_time,
+	 * or it should be in neither RBTree; consequently, it should be impossible
+	 * to use more than 2 UndoRequestNode objects per UndoRequest. Since we
+	 * preallocate that number, we should never run out. In case there's a
+	 * bug in the logic, let's insert a runtime check here even when Asserts
+	 * are disabled.
+	 */
+	if (node == NULL)
+		elog(ERROR, "no free UndoRequestNode");
+
+	/* Pop freelist. */
+	urm->first_free_request_node = *(UndoRequestNode **) node;
+
+	return &node->rbtnode;
+}
+
+/*
+ * RBTree callback to free an UndoRequestNode.
+ *
+ * Just put it back on the freelist.
+ */
+static void
+UndoRequestNodeFree(RBTNode *x, void *arg)
+{
+	UndoRequestManager *urm = arg;
+	UndoRequestNode *node = (UndoRequestNode *) x;
+
+	*(UndoRequestNode **) node = urm->first_free_request_node;
+	urm->first_free_request_node = node;
+}
+
+/*
+ * RBTree callback to combine an UndoRequestNode with another one.
+ *
+ * The key for every RBTree includes the FXID, which is unique, so it should
+ * never happen that we need to merge requests.
+ */
+static void
+UndoRequestNodeCombine(RBTNode *existing, const RBTNode *newdata, void *arg)
+{
+	elog(ERROR, "undo requests should never need to be combined");
+}
+
+/*
+ * RBTree comparator for requests_by_retry_time. Older retry
+ * times first; in the case of a tie, smaller FXIDs first.  This avoids ties,
+ * which is important since we don't want to merge requests, and also favors
+ * retiring older transactions first, which is generally desirable.
+ */
+static int
+UndoRequestNodeCompareRetryTime(const RBTNode *a, const RBTNode *b, void *arg)
+{
+	const UndoRequestNode *aa = (UndoRequestNode *) a;
+	const UndoRequestNode *bb = (UndoRequestNode *) b;
+	FullTransactionId fxid_a = aa->req->fxid;
+	FullTransactionId fxid_b = bb->req->fxid;
+	TimestampTz retry_time_a = aa->req->retry_time;
+	TimestampTz retry_time_b = bb->req->retry_time;
+
+	if (retry_time_a != retry_time_b)
+		return retry_time_a < retry_time_b ? -1 : 1;
+
+	if (FullTransactionIdPrecedes(fxid_a, fxid_b))
+		return -1;
+	else if (FullTransactionIdPrecedes(fxid_b, fxid_a))
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * RBTree comparator for requests_by_size. Lower FXIDs first. No tiebreak,
+ * because FXIDs should be unique.
+ */
+static int
+UndoRequestNodeCompareFXID(const RBTNode *a, const RBTNode *b, void *arg)
+{
+	const UndoRequestNode *aa = (UndoRequestNode *) a;
+	const UndoRequestNode *bb = (UndoRequestNode *) b;
+	FullTransactionId fxid_a = aa->req->fxid;
+	FullTransactionId fxid_b = bb->req->fxid;
+
+	if (FullTransactionIdPrecedes(fxid_a, fxid_b))
+		return -1;
+	else if (FullTransactionIdPrecedes(fxid_b, fxid_a))
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * RBTree comparator for requests_by_size. As in we do for the retry
+ * time RBTree, break ties in favor of lower FXIDs.
+ */
+static int
+UndoRequestNodeCompareSize(const RBTNode *a, const RBTNode *b, void *arg)
+{
+	const UndoRequestNode *aa = (UndoRequestNode *) a;
+	const UndoRequestNode *bb = (UndoRequestNode *) b;
+	FullTransactionId fxid_a = aa->req->fxid;
+	FullTransactionId fxid_b = bb->req->fxid;
+	Size size_a = aa->req->size;
+	Size size_b = bb->req->size;
+
+	if (size_a != size_b)
+		return size_a < size_b ? 1 : -1;
+
+	if (FullTransactionIdPrecedes(fxid_a, fxid_b))
+		return -1;
+	else if (FullTransactionIdPrecedes(fxid_b, fxid_a))
+		return 1;
+	else
+		return 0;
+}
+
+/*
+ * Insert an UndoRequest into one RBTree.
+ *
+ * The actual RBTree element is an UndoRequestNode, which just points to
+ * the actual UndoRequest.
+ */
+static void
+InsertUndoRequest(RBTree *rbt, UndoRequest *req)
+{
+	UndoRequestNode	dummy;
+	bool isNew;
+
+	/*
+	 * The rbt_insert interface is a bit strange: we have to pass something
+	 * that looks like an RBTNode, but the RBTNode itself doesn't need to
+	 * be initialized - only the "extra" data that follows the end of the
+	 * structure needs to be correct.
+	 */
+	dummy.req = req;
+	rbt_insert(rbt, &dummy.rbtnode, &isNew);
+	Assert(isNew);
+}
+
+/*
+ * Remove an UndoRequest from one RBTree.
+ *
+ * This is just the reverse of InsertUndoRequest, with the same interface
+ * quirk.
+ */
+static void
+RemoveUndoRequest(RBTree *rbt, UndoRequest *req)
+{
+	UndoRequestNode	dummy;
+	RBTNode *node;
+
+	dummy.req = req;
+	node = rbt_find(rbt, &dummy.rbtnode);
+	rbt_delete(rbt, node);
+}
+
+/*
+ * Find an UndoRequest by FXID.
+ *
+ * If we needed to do this frequently, it might be worth maintaining a hash
+ * table mapping FXID -> UndoRequest, but since we only need it after a system
+ * restart, RBTree's O(lg n) performance seems good enough.
+ *
+ * Note that this can only find an UndoRequest that has not failed and is not
+ * yet being processed, because a failed UndoRequest would be in
+ * requests_by_retry_time, not requests_by_fxid, and an in-progress
+ * UndoRequest wouldn't be in either data structure. That restriction, too,
+ * is OK for current uses.
+ */
+static UndoRequest *
+FindUndoRequest(UndoRequestManager *urm, FullTransactionId fxid)
+{
+	UndoRequest	dummy_request;
+	UndoRequestNode	dummy_node;
+	RBTNode *node;
+
+	/*
+	 * Here we need both a dummy UndoRequest and a dummy UndoRequestNode; only
+	 * the comparator will look at the dummy UndoRequestNode, and it will only
+	 * look at UndoRequest, and specifically its FXID.
+	 */
+	dummy_request.fxid = fxid;
+	dummy_node.req = &dummy_request;
+	node = rbt_find(&urm->requests_by_fxid, &dummy_node.rbtnode);
+	if (node == NULL)
+		return NULL;
+	return ((UndoRequestNode *) node)->req;
+}
diff --git a/src/backend/lib/rbtree.c b/src/backend/lib/rbtree.c
index 33181e9211..bda870eab7 100644
--- a/src/backend/lib/rbtree.c
+++ b/src/backend/lib/rbtree.c
@@ -35,25 +35,6 @@
 #define RBTBLACK	(0)
 #define RBTRED		(1)
 
-/*
- * RBTree control structure
- */
-struct RBTree
-{
-	RBTNode    *root;			/* root node, or RBTNIL if tree is empty */
-
-	/* Remaining fields are constant after rbt_create */
-
-	Size		node_size;		/* actual size of tree nodes */
-	/* The caller-supplied manipulation functions */
-	rbt_comparator comparator;
-	rbt_combiner combiner;
-	rbt_allocfunc allocfunc;
-	rbt_freefunc freefunc;
-	/* Passthrough arg passed to all manipulation functions */
-	void	   *arg;
-};
-
 /*
  * all leafs are sentinels, use customized NIL name to prevent
  * collision with system-wide constant NIL which is actually NULL
@@ -122,6 +103,33 @@ rbt_create(Size node_size,
 	return tree;
 }
 
+/*
+ * rbt_initialize: initalize an empty RBTree
+ *
+ * This is just like rbt_create, except that the caller is responsible for
+ * allocating the memory.
+ */
+void
+rbt_initialize(RBTree *rbt,
+			   Size node_size,
+			   rbt_comparator comparator,
+			   rbt_combiner combiner,
+			   rbt_allocfunc allocfunc,
+			   rbt_freefunc freefunc,
+			   void *arg)
+{
+	Assert(node_size > sizeof(RBTNode));
+
+	rbt->root = RBTNIL;
+	rbt->node_size = node_size;
+	rbt->comparator = comparator;
+	rbt->combiner = combiner;
+	rbt->allocfunc = allocfunc;
+	rbt->freefunc = freefunc;
+
+	rbt->arg = arg;
+}
+
 /* Copy the additional data fields from one RBTNode to another */
 static inline void
 rbt_copy_data(RBTree *rbt, RBTNode *dest, const RBTNode *src)
diff --git a/src/include/access/undorequest.h b/src/include/access/undorequest.h
new file mode 100644
index 0000000000..11324b4888
--- /dev/null
+++ b/src/include/access/undorequest.h
@@ -0,0 +1,77 @@
+/*-------------------------------------------------------------------------
+ *
+ * undorequest.h
+ *		Undo request manager.
+ *
+ * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/access/undorequest.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef UNDOREQUEST_H
+#define UNDOREQUEST_H
+
+#include "access/transam.h"
+#include "access/undolog.h"
+#include "datatype/timestamp.h"
+
+struct UndoRequest;
+struct UndoRequestManager;
+typedef struct UndoRequest UndoRequest;
+typedef struct UndoRequestManager UndoRequestManager;
+
+/* Initialization functions. */
+extern Size EstimateUndoRequestManagerSize(Size capacity);
+extern void InitializeUndoRequestManager(UndoRequestManager *urm,
+										 LWLock *lock, Size capacity,
+										 Size soft_limit);
+
+/* Call this before inserting undo records. */
+extern UndoRequest *RegisterUndoRequest(UndoRequestManager *urm,
+										FullTransactionId fxid,
+										Oid dbid);
+
+/* Remember where our undo starts and ends. */
+extern void UndoRequestSetStartLocation(UndoRequestManager *urm,
+										UndoRequest *req,
+										bool is_logged,
+										UndoRecPtr start_location);
+
+/* Remember undo size and end locations. */
+extern void FinalizeUndoRequest(UndoRequestManager *urm,
+								UndoRequest *req,
+								Size size,
+								UndoRecPtr end_location_logged,
+								UndoRecPtr end_location_unlogged);
+
+/* Forget about an UndoRequest we don't need any more. */
+extern void UnregisterUndoRequest(UndoRequestManager *urm, UndoRequest *req);
+
+/* Attempt to dispatch UndoRequest for background processing. */
+extern bool PerformUndoInBackground(UndoRequestManager *urm, UndoRequest *req);
+
+/* Get work for background undo process. */
+extern UndoRequest *GetNextUndoRequest(UndoRequestManager *urm, Oid dbid,
+									   bool minimum_runtime_reached,
+									   FullTransactionId *fxid,
+									   UndoRecPtr *start_location_logged,
+									   UndoRecPtr *end_location_logged,
+									   UndoRecPtr *start_location_unlogged,
+									   UndoRecPtr *end_location_unlogged);
+
+/* Reschedule failed undo attempt. */
+extern void RescheduleUndoRequest(UndoRequestManager *urm, UndoRequest *req);
+
+/* Restore state after crash. */
+extern bool RecreateUndoRequest(UndoRequestManager *urm,
+								FullTransactionId fxid, Oid dbid,
+								bool is_logged,
+								UndoRecPtr start_location,
+								UndoRecPtr end_location,
+								Size size);
+extern UndoRequest *SuspendPreparedUndoRequest(UndoRequestManager *urm,
+											   FullTransactionId fxid);
+
+#endif
diff --git a/src/include/lib/rbtree.h b/src/include/lib/rbtree.h
index 6d79a24015..ff6f99a932 100644
--- a/src/include/lib/rbtree.h
+++ b/src/include/lib/rbtree.h
@@ -28,8 +28,33 @@ typedef struct RBTNode
 	struct RBTNode *parent;		/* parent, or NULL (not RBTNIL!) if none */
 } RBTNode;
 
-/* Opaque struct representing a whole tree */
-typedef struct RBTree RBTree;
+/* Support functions to be provided by caller */
+typedef int (*rbt_comparator) (const RBTNode *a, const RBTNode *b, void *arg);
+typedef void (*rbt_combiner) (RBTNode *existing, const RBTNode *newdata, void *arg);
+typedef RBTNode *(*rbt_allocfunc) (void *arg);
+typedef void (*rbt_freefunc) (RBTNode *x, void *arg);
+
+/*
+ * RBTree control structure
+ *
+ * This is declared here to make it possible to preallocate an object of
+ * the correct size, but callers should not access the members diretly.
+ */
+typedef struct RBTree
+{
+	RBTNode    *root;			/* root node, or RBTNIL if tree is empty */
+
+	/* Remaining fields are constant after rbt_create */
+
+	Size		node_size;		/* actual size of tree nodes */
+	/* The caller-supplied manipulation functions */
+	rbt_comparator comparator;
+	rbt_combiner combiner;
+	rbt_allocfunc allocfunc;
+	rbt_freefunc freefunc;
+	/* Passthrough arg passed to all manipulation functions */
+	void	   *arg;
+} RBTree;
 
 /* Available tree iteration orderings */
 typedef enum RBTOrderControl
@@ -53,18 +78,19 @@ struct RBTreeIterator
 	bool		is_over;
 };
 
-/* Support functions to be provided by caller */
-typedef int (*rbt_comparator) (const RBTNode *a, const RBTNode *b, void *arg);
-typedef void (*rbt_combiner) (RBTNode *existing, const RBTNode *newdata, void *arg);
-typedef RBTNode *(*rbt_allocfunc) (void *arg);
-typedef void (*rbt_freefunc) (RBTNode *x, void *arg);
-
 extern RBTree *rbt_create(Size node_size,
 						  rbt_comparator comparator,
 						  rbt_combiner combiner,
 						  rbt_allocfunc allocfunc,
 						  rbt_freefunc freefunc,
 						  void *arg);
+extern void rbt_initialize(RBTree *rbt,
+						   Size node_size,
+						   rbt_comparator comparator,
+						   rbt_combiner combiner,
+						   rbt_allocfunc allocfunc,
+						   rbt_freefunc freefunc,
+						   void *arg);
 
 extern RBTNode *rbt_find(RBTree *rbt, const RBTNode *data);
 extern RBTNode *rbt_leftmost(RBTree *rbt);
diff --git a/src/test/modules/Makefile b/src/test/modules/Makefile
index 60d6d7be1b..f32afffab1 100644
--- a/src/test/modules/Makefile
+++ b/src/test/modules/Makefile
@@ -19,6 +19,7 @@ SUBDIRS = \
 		  test_rbtree \
 		  test_rls_hooks \
 		  test_shm_mq \
+		  test_undo_request_manager \
 		  unsafe_tests \
 		  worker_spi
 
diff --git a/src/test/modules/test_undo_request_manager/Makefile b/src/test/modules/test_undo_request_manager/Makefile
new file mode 100644
index 0000000000..5bc4695004
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/Makefile
@@ -0,0 +1,21 @@
+# src/test/modules/test_undo_request_manager/Makefile
+
+MODULE_big = test_undo_request_manager
+OBJS = test_undo_request_manager.o $(WIN32RES)
+PGFILEDESC = "test_undo_request_manager - test undo request manager code"
+
+EXTENSION = test_undo_request_manager
+DATA = test_undo_request_manager--1.0.sql
+
+REGRESS = test_undo_request_manager
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = src/test/modules/test_undo_request_manager
+top_builddir = ../../../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/src/test/modules/test_undo_request_manager/expected/test_undo_request_manager.out b/src/test/modules/test_undo_request_manager/expected/test_undo_request_manager.out
new file mode 100644
index 0000000000..c79611b3b6
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/expected/test_undo_request_manager.out
@@ -0,0 +1,28 @@
+CREATE EXTENSION test_undo_request_manager;
+-- not enough space
+select urm_simple_test(1, '{10000,20000}');
+ERROR:  unable to register undo request #2
+-- simple case
+select urm_simple_test(2, '{10000,20000}');
+ urm_simple_test 
+-----------------
+ {1001,1002}
+(1 row)
+
+-- should alternate between early and large requests in order
+select urm_simple_test(10,
+'{10000,20000,30000,40000,50000,1000000,1000000,1000000,1000000}');
+                urm_simple_test                 
+------------------------------------------------
+ {1001,1006,1002,1007,1003,1008,1004,1009,1005}
+(1 row)
+
+-- should alternate between early and large requests, but the large requests
+-- should be processed in reverse order
+select urm_simple_test(10,
+'{10000,20000,30000,40000,50000,1000000,2000000,3000000,4000000,50000000}');
+                   urm_simple_test                   
+-----------------------------------------------------
+ {1001,1010,1002,1009,1003,1008,1004,1007,1005,1006}
+(1 row)
+
diff --git a/src/test/modules/test_undo_request_manager/sql/test_undo_request_manager.sql b/src/test/modules/test_undo_request_manager/sql/test_undo_request_manager.sql
new file mode 100644
index 0000000000..6611e040b6
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/sql/test_undo_request_manager.sql
@@ -0,0 +1,16 @@
+CREATE EXTENSION test_undo_request_manager;
+
+-- not enough space
+select urm_simple_test(1, '{10000,20000}');
+
+-- simple case
+select urm_simple_test(2, '{10000,20000}');
+
+-- should alternate between early and large requests in order
+select urm_simple_test(10,
+'{10000,20000,30000,40000,50000,1000000,1000000,1000000,1000000}');
+
+-- should alternate between early and large requests, but the large requests
+-- should be processed in reverse order
+select urm_simple_test(10,
+'{10000,20000,30000,40000,50000,1000000,2000000,3000000,4000000,50000000}');
diff --git a/src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql b/src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql
new file mode 100644
index 0000000000..30ff471c23
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql
@@ -0,0 +1,9 @@
+/* src/test/modules/test_undo_request_manager/test_undo_request_manager--1.0.sql */
+
+-- complain if script is sourced in psql, rather than via CREATE EXTENSION
+\echo Use "CREATE EXTENSION test_undo_request_manager" to load this file. \quit
+
+CREATE FUNCTION urm_simple_test(capacity pg_catalog.int4,
+								requests pg_catalog.int8[])
+    RETURNS pg_catalog.int8[] STRICT
+	AS 'MODULE_PATHNAME' LANGUAGE C;
diff --git a/src/test/modules/test_undo_request_manager/test_undo_request_manager.c b/src/test/modules/test_undo_request_manager/test_undo_request_manager.c
new file mode 100644
index 0000000000..8b994283c8
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/test_undo_request_manager.c
@@ -0,0 +1,139 @@
+/*--------------------------------------------------------------------------
+ *
+ * test_undo_request_manager.c
+ *		Test undo request manager.
+ *
+ * Copyright (c) 2013-2019, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *		src/test/modules/test_undo_request_manager/undo_request_manager.c
+ *
+ * -------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/undorequest.h"
+#include "catalog/pg_type_d.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "storage/proc.h"
+#include "utils/array.h"
+
+PG_MODULE_MAGIC;
+PG_FUNCTION_INFO_V1(urm_simple_test);
+
+/*
+ * SQL-callable test function.  We create an UndoRequestManager in
+ * backend-private memory here and exercise it a bit to see if it breaks.
+ *
+ * The first argument is the capacity of the UndoRequestManager as an integer.
+ *
+ * The second argument is 1-dimensional bigint array, where each subarray
+ * contains a hypothetical undo size.
+ *
+ * This function registers and inserts all the requests (failing if space is
+ * exhausted) with fake, sequentially assigned transaction IDs, and then
+ * fetches them back one by one. The return value is an array of fake
+ * transaction IDs in the order they were returned.
+ *
+ * This test doesn't simulate undo failure, multi-database operation, or
+ * prepared transactions.
+ */
+Datum
+urm_simple_test(PG_FUNCTION_ARGS)
+{
+	int64	capacity = PG_GETARG_INT32(0);
+	ArrayType *array = PG_GETARG_ARRAYTYPE_P(1);
+	Datum	  *darray;
+	int			nentries;
+	Datum	  *dresult;
+	ArrayType *result;
+	UndoRequestManager *urm;
+	const UndoRecPtr SomeValidUndoRecPtr = InvalidUndoRecPtr + 1;
+	int			i;
+	FullTransactionId fake_fxid = FullTransactionIdFromEpochAndXid(0, 1000);
+
+	/* Require positive capacity. */
+	if (capacity <= 0)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("undo request manager capacity must be a positive integer")));
+
+	/* Sanity-check and deconstruct array. */
+	if (ARR_NDIM(array) != 1)
+		ereport(ERROR,
+				(errcode(ERRCODE_ARRAY_ELEMENT_ERROR),
+				 errmsg("array must have exactly 1 dimension")));
+	if (array_contains_nulls(array))
+		ereport(ERROR,
+				(errcode(ERRCODE_ARRAY_ELEMENT_ERROR),
+				 errmsg("cannot work with arrays containing NULLs")));
+	deconstruct_array(array, INT8OID, 8, FLOAT8PASSBYVAL, 'd',
+					  &darray, NULL, &nentries);
+
+	/*
+	 * Initialize UndoRequestManager. We have to supply an LWLock; rather than
+	 * creating a new one somewhere, just use our own backendLock. These locks
+	 * aren't that heavily trafficked and we won't have any reason to take it
+	 * for any other purpose while the UndoRequstManager holds it, so this
+	 * should be safe enough.
+	 *
+	 * We make the soft limit equal to the full capacity here for testing
+	 * purposes, which means that we should always succeed in dispatching to
+	 * the background.
+	 */
+	urm = palloc(EstimateUndoRequestManagerSize(capacity));
+	InitializeUndoRequestManager(urm, &MyProc->backendLock,
+								 capacity, capacity);
+
+	/* Insert entries as provided by caller. */
+	for (i = 0; i < nentries; ++i)
+	{
+		int64	size = DatumGetInt64(darray[i]);
+		UndoRequest *req;
+
+		FullTransactionIdAdvance(&fake_fxid);
+
+		req = RegisterUndoRequest(urm, fake_fxid, MyDatabaseId);
+		if (req == NULL)
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("unable to register undo request #%d", i + 1)));
+		UndoRequestSetStartLocation(urm, req, true, SomeValidUndoRecPtr);
+		FinalizeUndoRequest(urm, req, size,
+							SomeValidUndoRecPtr,
+							InvalidUndoRecPtr);
+		if (!PerformUndoInBackground(urm, req))
+			ereport(ERROR,
+					(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					 errmsg("unable to background undo request #%d", i + 1)));
+	}
+
+	/* Now get the entries back. */
+	dresult = palloc(nentries * sizeof(Datum));
+	for (i = 0; true; ++i)
+	{
+		UndoRequest *req;
+		UndoRecPtr	p[4];
+
+		/* Get some work. */
+		req = GetNextUndoRequest(urm, MyDatabaseId, true,
+								 &fake_fxid, &p[0], &p[1], &p[2], &p[3]);
+		if (req == NULL)
+			break;
+		if (i >= nentries)
+			elog(ERROR, "found more undo requests than were inserted");
+
+		/* Save the fake FXID. */
+		dresult[i] =
+			Int64GetDatum((int64) U64FromFullTransactionId(fake_fxid));
+
+		/* Report that we successfully processed the imaginary undo. */
+		UnregisterUndoRequest(urm, req);
+	}
+
+	/* Put result into array form. */
+	result = construct_array(dresult, i, INT8OID, 8, FLOAT8PASSBYVAL, 'd');
+	PG_RETURN_ARRAYTYPE_P(result);
+}
diff --git a/src/test/modules/test_undo_request_manager/test_undo_request_manager.control b/src/test/modules/test_undo_request_manager/test_undo_request_manager.control
new file mode 100644
index 0000000000..0a340e9843
--- /dev/null
+++ b/src/test/modules/test_undo_request_manager/test_undo_request_manager.control
@@ -0,0 +1,4 @@
+comment = 'Test code for undo request manager'
+default_version = '1.0'
+module_pathname = '$libdir/test_undo_request_manager'
+relocatable = true
-- 
2.17.2 (Apple Git-113)

