From cb1e99a1bea946f4da3f8ae6b0068d2464adfafa Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Tue, 11 Feb 2025 04:07:21 -0500
Subject: [PATCH v13 2/3] Filter transactions that need not be published

This adds logic to filter transactions early (at decode time, rather than at streaming time) so
they can be skipped if they do not contain tables that are part of the publication list of the
logical replication walsender.

Determining whether to filter a change requires information about the relation
and the publication from the catalog, requiring a transaction to be started.
When most changes in a transaction are unfilterable, the overhead of starting a
transaction for each record is significant. To reduce this overhead a hash cache of relation file
locators is created. Even then a hash search for every record before recording has considerable
overhead especially for use cases where most tables in an instance are published. To further reduce
this overhead a simple approach is used to suspend filtering for a certain number of changes
(100) when an unfilterable change is encountered. In other words, continue filtering changes if
the last record was filtered out. If an unfilterable change is found, skip filtering the next 100
changes.
---
 src/backend/replication/logical/decode.c        |   5 +
 src/backend/replication/logical/reorderbuffer.c | 423 ++++++++++++++++++++----
 src/include/replication/reorderbuffer.h         |  12 +
 src/tools/pgindent/typedefs.list                |   2 +
 4 files changed, 374 insertions(+), 68 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 0bff0f1..c5f1083 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -915,6 +915,11 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	if (ctx->reorder->can_filter_change &&
+		ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
+											buf->origptr, &target_locator, true))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
 		change->action = REORDER_BUFFER_CHANGE_INSERT;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 3d22aa9..7be0033 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -108,6 +108,7 @@
 #include "storage/fd.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
+#include "utils/inval.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
@@ -226,8 +227,10 @@ static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
 											   XLogRecPtr lsn, bool create_as_top);
 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
 											  ReorderBufferTXN *subtxn);
-
 static void AssertTXNLsnOrder(ReorderBuffer *rb);
+static Relation ReorderBufferGetRelation(ReorderBuffer *rb,
+										 RelFileLocator *rlocator,
+										 bool has_tuple);
 
 /* ---------------------------------------
  * support functions for lsn-order iterating over the ->changes of a
@@ -276,6 +279,8 @@ static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
  */
 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
+static Snapshot ReorderBufferStreamTXNSnapshot(ReorderBuffer *rb,
+											   ReorderBufferTXN *txn);
 static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
 
@@ -302,6 +307,48 @@ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											bool addition, Size sz);
 
 /*
+ * ---------------------------------------
+ * RelFileLocator filtering
+ * ---------------------------------------
+ * This hash table serves as a lookup table for determining if a relation can
+ * be filtered before being decoded and queued into the buffer.
+ *
+ * The hash table shares the same lifespan as the reorder buffer. This is
+ * crucial because each reorderbuffer may have different configurations or be
+ * associated with different output plugins, affecting the types of tables to
+ * be processed.
+ */
+
+static HTAB *RelFileLocatorFilterCache = NULL;
+
+static bool relation_callbacks_registered = false;
+
+typedef struct ReorderBufferRelFileLocatorKey
+{
+	Oid			reltablespace;
+	RelFileNumber relfilenumber;
+} ReorderBufferRelFileLocatorKey;
+
+/* entry for hash table we use to track if the relation can be filtered */
+typedef struct ReorderBufferRelFileLocatorEnt
+{
+	ReorderBufferRelFileLocatorKey key;
+	Oid			relid;
+	bool		filterable;
+} ReorderBufferRelFileLocatorEnt;
+
+static void RelFileLocatorCacheInvalidateCallback(Datum arg, Oid relid);
+static void ReorderBufferMemoryResetcallback(void *arg);
+
+/*
+ * After encountering a change that cannot be filtered out, filtering is
+ * temporarily suspended. Filtering resumes after processing every 100 changes.
+ * This strategy helps to minimize the overhead of performing a hash table
+ * search for each record, especially when most changes are not filterable.
+ */
+#define CHANGES_THRESHOLD_FOR_FILTER 100
+
+/*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
  * prior ReorderBuffer instances for the same slot.
  */
@@ -359,6 +406,34 @@ ReorderBufferAllocate(void)
 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+	/*
+	 * To support early filtering of changes, this hash table acts
+	 * as a lookup table to determine if the corresponding relation is
+	 * required to be decoded and queued into the buffer. The hash table
+	 * shares the same lifespan as the reorder buffer.
+	 *
+	 * Also setup the callback to invalidate cache when relations are updated.
+	 */
+	hash_ctl.keysize = sizeof(ReorderBufferRelFileLocatorKey);
+	hash_ctl.entrysize = sizeof(ReorderBufferRelFileLocatorEnt);
+	hash_ctl.hcxt = buffer->context;
+
+	RelFileLocatorFilterCache =
+		hash_create("RelFileLocatorFilterCache", 1000, &hash_ctl,
+					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+	buffer->relfile_callback.arg = NULL;
+	buffer->relfile_callback.func = ReorderBufferMemoryResetcallback;
+	MemoryContextRegisterResetCallback(buffer->context, &buffer->relfile_callback);
+
+	if (!relation_callbacks_registered)
+	{
+		/* Watch for invalidation events. */
+		CacheRegisterRelcacheCallback(RelFileLocatorCacheInvalidateCallback,
+									  (Datum) 0);
+		relation_callbacks_registered = true;
+	}
+
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
@@ -369,6 +444,8 @@ ReorderBufferAllocate(void)
 	/* txn_heap is ordered by transaction size */
 	buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL);
 
+	buffer->can_filter_change = true;
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -840,6 +917,16 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	txn->nentries++;
 	txn->nentries_mem++;
 
+	/*
+	 * If we're not filtering and we've crossed the change threshold,
+	 * attempt to filter again
+	 */
+	if (!rb->can_filter_change && ++rb->processed_changes >= CHANGES_THRESHOLD_FOR_FILTER)
+	{
+		rb->can_filter_change = true;
+		rb->processed_changes = 0;
+	}
+
 	/* update memory accounting information */
 	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
@@ -2105,6 +2192,81 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 }
 
 /*
+ * Return the relation corresponding to the given RelFileLocator 'rlocator' if
+ * relevant to decoding. Otherwise, return NULL.
+ *
+ * We don't decode catalog files, relations that are not logically logged,
+ * temporary heaps and sequences.
+ */
+static Relation
+ReorderBufferGetRelation(ReorderBuffer *rb, RelFileLocator *rlocator,
+						 bool has_tuple)
+{
+	bool		filterable = false;
+	Relation	relation;
+	Oid			reloid;
+
+	reloid = RelidByRelfilenumber(rlocator->spcOid, rlocator->relNumber);
+
+	if (!OidIsValid(reloid))
+	{
+		if (!has_tuple)
+		{
+			/*
+			 * Mapped catalog tuple without data, emitted while catalog table was in
+			 * the process of being rewritten. We can fail to look up the
+			 * relfilenumber, because the relmapper has no "historic" view, in
+			 * contrast to the normal catalog during decoding. Thus repeated rewrites
+			 * can cause a lookup failure. That's OK because we do not decode catalog
+			 * changes anyway. Normally such tuples would be skipped over below, but
+			 * we can't identify whether the table should be logically logged without
+			 * mapping the relfilenumber to the oid.
+			 */
+			return NULL;
+		}
+
+		elog(ERROR, "could not map filenumber \"%s\" to relation OID",
+				relpathperm(*rlocator, MAIN_FORKNUM));
+	}
+
+	relation = RelationIdGetRelation(reloid);
+
+	if (!RelationIsValid(relation))
+		elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
+			 reloid, relpathperm(*rlocator, MAIN_FORKNUM));
+
+	if (!RelationIsLogicallyLogged(relation))
+		filterable = true;
+
+	else if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+	{
+		/*
+		 * Ignore temporary heaps created during DDL unless the plugin has asked
+		 * for them.
+		 */
+		filterable = true;
+	}
+
+	else if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+	{
+		/*
+		 * For now ignore sequence changes entirely. Most of the time they don't
+		 * log changes using records we understand, so it doesn't make sense to
+		 * handle the few cases we do.
+		 */
+		filterable = true;
+	}
+
+	if (filterable)
+	{
+		RelationClose(relation);
+		return NULL;
+	}
+
+	return relation;
+}
+
+/*
  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
  *
  * Send data of a transaction (and its subtransactions) to the
@@ -2176,7 +2338,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
 		{
 			Relation	relation = NULL;
-			Oid			reloid;
 
 			CHECK_FOR_INTERRUPTS();
 
@@ -2235,55 +2396,11 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_DELETE:
 					Assert(snapshot_now);
 
-					reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
-												  change->data.tp.rlocator.relNumber);
-
-					/*
-					 * Mapped catalog tuple without data, emitted while
-					 * catalog table was in the process of being rewritten. We
-					 * can fail to look up the relfilenumber, because the
-					 * relmapper has no "historic" view, in contrast to the
-					 * normal catalog during decoding. Thus repeated rewrites
-					 * can cause a lookup failure. That's OK because we do not
-					 * decode catalog changes anyway. Normally such tuples
-					 * would be skipped over below, but we can't identify
-					 * whether the table should be logically logged without
-					 * mapping the relfilenumber to the oid.
-					 */
-					if (reloid == InvalidOid &&
-						change->data.tp.newtuple == NULL &&
-						change->data.tp.oldtuple == NULL)
-						goto change_done;
-					else if (reloid == InvalidOid)
-						elog(ERROR, "could not map filenumber \"%s\" to relation OID",
-							 relpathperm(change->data.tp.rlocator,
-										 MAIN_FORKNUM));
-
-					relation = RelationIdGetRelation(reloid);
+					relation = ReorderBufferGetRelation(rb, &change->data.tp.rlocator,
+														(change->data.tp.newtuple != NULL ||
+														 change->data.tp.oldtuple != NULL));
 
 					if (!RelationIsValid(relation))
-						elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
-							 reloid,
-							 relpathperm(change->data.tp.rlocator,
-										 MAIN_FORKNUM));
-
-					if (!RelationIsLogicallyLogged(relation))
-						goto change_done;
-
-					/*
-					 * Ignore temporary heaps created during DDL unless the
-					 * plugin has asked for them.
-					 */
-					if (relation->rd_rel->relrewrite && !rb->output_rewrites)
-						goto change_done;
-
-					/*
-					 * For now ignore sequence changes entirely. Most of the
-					 * time they don't log changes using records we
-					 * understand, so it doesn't make sense to handle the few
-					 * cases we do.
-					 */
-					if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
 						goto change_done;
 
 					/* user-triggered change */
@@ -4058,19 +4175,13 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 }
 
 /*
- * Send data of a large transaction (and its subtransactions) to the
- * output plugin, but using the stream API.
+ * This function generates or retrieves a consistent snapshot of a transaction
+ * that is currently in progress for logical replication.
  */
-static void
-ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+static Snapshot
+ReorderBufferStreamTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	Snapshot	snapshot_now;
-	CommandId	command_id;
-	Size		stream_bytes;
-	bool		txn_is_streamed;
-
-	/* We can never reach here for a subtransaction. */
-	Assert(rbtxn_is_toptxn(txn));
 
 	/*
 	 * We can't make any assumptions about base snapshot here, similar to what
@@ -4113,12 +4224,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		if (txn->base_snapshot == NULL)
 		{
 			Assert(txn->ninvalidations == 0);
-			return;
+			return NULL;
 		}
 
-		command_id = FirstCommandId;
 		snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
-											 txn, command_id);
+											 txn, FirstCommandId);
 	}
 	else
 	{
@@ -4131,17 +4241,40 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		 * the LSN condition in the previous branch (so no need to walk
 		 * through subxacts again). In fact, we must not do that as we may be
 		 * using snapshot half-way through the subxact.
-		 */
-		command_id = txn->command_id;
-
-		/*
+		 *
 		 * We can't use txn->snapshot_now directly because after the last
 		 * streaming run, we might have got some new sub-transactions. So we
 		 * need to add them to the snapshot.
 		 */
 		snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
-											 txn, command_id);
+											 txn, txn->command_id);
+	}
+
+	return snapshot_now;
+}
+
+/*
+ * Send data of a large transaction (and its subtransactions) to the
+ * output plugin, but using the stream API.
+ */
+static void
+ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	Snapshot	snapshot_now;
+	Size		stream_bytes;
+	bool		txn_is_streamed;
+
+	/* We can never reach here for a subtransaction. */
+	Assert(rbtxn_is_toptxn(txn));
+
+	snapshot_now = ReorderBufferStreamTXNSnapshot(rb, txn);
+
+	/* This transaction didn't make any changes to the database till now. */
+	if (snapshot_now == NULL)
+		return;
 
+	if (txn->snapshot_now != NULL)
+	{
 		/* Free the previously copied snapshot. */
 		Assert(txn->snapshot_now->copied);
 		ReorderBufferFreeSnap(rb, txn->snapshot_now);
@@ -4159,7 +4292,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 	/* Process and send the changes to output plugin. */
 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
-							command_id, true);
+							snapshot_now->curcid, true);
 
 	rb->streamCount += 1;
 	rb->streamBytes += stream_bytes;
@@ -5348,3 +5481,157 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Flush mapping entries when the corresponding relations are updated in pg_class.
+ */
+static void
+RelFileLocatorCacheInvalidateCallback(Datum arg, Oid relid)
+{
+	HASH_SEQ_STATUS status;
+	ReorderBufferRelFileLocatorEnt *entry;
+
+	if (!RelFileLocatorFilterCache)
+		return;
+
+	hash_seq_init(&status, RelFileLocatorFilterCache);
+
+	/* slightly inefficient algorithm but not performance critical path */
+	while ((entry = (ReorderBufferRelFileLocatorEnt *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * If relid is InvalidOid, signaling a complete reset, we must remove
+		 * all entries, otherwise just remove the specific relation's entry.
+		 * Always remove negative cache entries.
+		 */
+		if (relid == InvalidOid ||	/* complete reset */
+			entry->relid == InvalidOid ||	/* negative cache entry */
+			entry->relid == relid)	/* individual flushed relation */
+		{
+			if (hash_search(RelFileLocatorFilterCache,
+							&entry->key,
+							HASH_REMOVE,
+							NULL) == NULL)
+				elog(ERROR, "hash table corrupted");
+		}
+	}
+}
+
+/*
+ * Context reset/delete callback for RelFileLocatorFilterCache.
+ */
+static void
+ReorderBufferMemoryResetcallback(void *arg)
+{
+	RelFileLocatorFilterCache = NULL;
+}
+
+/*
+ * Determine whether the record corresponding to the relation identified by
+ * 'rlocator' needs to be filtered and not decoded and queued in the buffer.
+ *
+ * Determining the necessity of decoding requires accessing relation
+ * information from the system catalog, which requires a historical snapshot.
+ * We cannot directly use the current snapshot of the transaction due to the
+ * presence of INTERNAL_SNAPSHOT, COMMAND_ID, or INVALIDATION records in the
+ * buffer that could modify the snapshot. A proper snapshot can only be
+ * constructed after these records are processed in ReorderBufferProcessTXN, or
+ * if we are decoding a transaction without these records. See comment on top
+ * of ReorderBufferGetRelation() to see list of relations that are not
+ * decoded by the reorderbuffer.
+ *
+ * To reduce the overhead from the system catalog access and transaction
+ * management, the results are cached in the 'RelFileLocatorFilterCache' hash
+ * table.
+ *
+ * Returns true if the relation can be filtered; otherwise, false.
+ */
+bool
+ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+									XLogRecPtr lsn, RelFileLocator *rlocator,
+									bool has_tuple)
+{
+	bool		found;
+	Relation	relation;
+	bool		using_subtxn;
+	Snapshot	snapshot_now;
+	ReorderBufferTXN *txn,
+			   *toptxn;
+	ReorderBufferRelFileLocatorEnt *entry;
+	ReorderBufferRelFileLocatorKey key;
+
+	Assert(RelFileLocatorFilterCache);
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+	Assert(txn);
+	toptxn = rbtxn_get_toptxn(txn);
+	rb->can_filter_change = false;
+
+	/*
+	 * We cannot construct an accurate historical snapshot until all pending
+	 * records in this transaction that might update the snapshot are
+	 * processed.
+	 */
+	if (rbtxn_has_snapshot_changes(toptxn))
+		return false;
+
+	key.reltablespace = rlocator->spcOid;
+	key.relfilenumber = rlocator->relNumber;
+	entry = hash_search(RelFileLocatorFilterCache, &key, HASH_ENTER, &found);
+
+	if (found)
+	{
+		rb->can_filter_change = entry->filterable;
+		return entry->filterable;
+	}
+
+	/* constructs a temporary historical snapshot */
+	snapshot_now = ReorderBufferStreamTXNSnapshot(rb, toptxn);
+	Assert(snapshot_now);
+
+	/* build data to be able to lookup the CommandIds of catalog tuples */
+	ReorderBufferBuildTupleCidHash(rb, toptxn);
+
+	/* setup the initial snapshot */
+	SetupHistoricSnapshot(snapshot_now, toptxn->tuplecid_hash);
+
+	using_subtxn = IsTransactionOrTransactionBlock();
+
+	if (using_subtxn)
+		BeginInternalSubTransaction("filter change by RelFileLocator");
+	else
+		StartTransactionCommand();
+
+	relation = ReorderBufferGetRelation(rb, rlocator, has_tuple);
+
+	if (RelationIsValid(relation))
+	{
+		entry->relid = RelationGetRelid(relation);
+		entry->filterable = false;
+		RelationClose(relation);
+	}
+	else
+	{
+		entry->relid = InvalidOid;
+		entry->filterable = true;
+	}
+
+	rb->can_filter_change = entry->filterable;
+
+	ReorderBufferFreeSnap(rb, snapshot_now);
+
+	TeardownHistoricSnapshot(false);
+
+	/*
+	 * Aborting the current (sub-)transaction as a whole has the right
+	 * semantics. We want all locks acquired in here to be released, not
+	 * reassigned to the parent and we do not want any database access have
+	 * persistent effects.
+	 */
+	AbortCurrentTransaction();
+
+	if (using_subtxn)
+		RollbackAndReleaseCurrentSubTransaction();
+
+	return entry->filterable;
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index b0f26c7..7d805d4 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -622,6 +622,7 @@ struct ReorderBuffer
 	 * Private memory context.
 	 */
 	MemoryContext context;
+	MemoryContextCallback relfile_callback;
 
 	/*
 	 * Memory contexts for specific types objects
@@ -642,6 +643,12 @@ struct ReorderBuffer
 	/* Max-heap for sizes of all top-level and sub transactions */
 	pairingheap *txn_heap;
 
+	/* should we try to filter the change? */
+	bool		can_filter_change;
+
+	/* number of changes after a failed attempt at filtering */
+	int8		processed_changes;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
@@ -742,4 +749,9 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 
 extern void StartupReorderBuffer(void);
 
+extern bool ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+												XLogRecPtr lsn, RelFileLocator *rlocator,
+												bool has_tuple);
+extern bool ReorderBufferCanFilterChanges(ReorderBuffer *rb);
+
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 656ecd9..9b43262 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2433,6 +2433,8 @@ ReorderBufferIterTXNEntry
 ReorderBufferIterTXNState
 ReorderBufferMessageCB
 ReorderBufferPrepareCB
+ReorderBufferRelFileLocatorEnt
+ReorderBufferRelFileLocatorKey
 ReorderBufferRollbackPreparedCB
 ReorderBufferStreamAbortCB
 ReorderBufferStreamChangeCB
-- 
1.8.3.1

