From 27d7c257dc37efdd31a915efe2ff7220ac323409 Mon Sep 17 00:00:00 2001
From: Ajin Cherian <itsajin@gmail.com>
Date: Mon, 3 Mar 2025 04:52:05 -0500
Subject: [PATCH v15 1/3] Filter transactions that need not be published.

This patch set aims to filter transactions at the decode stage rather than
streaming time, which allows the system to skip processing transactions that do
not contain tables included in the logical replication walsender's publication
list. As a result, this can prevent the serialization of unnecessary records to
disk during non-streaming mode, especially in large transactions, and also
reduce memory and CPU usage in streaming mode when many changes can be filtered
out.

A hash cache of relation file locators is implemented to cache whether a
relation is filterable or not. This ensures that we only need to retrieve
relation and publication information from the catalog when a cache entry is
invalid, avoiding the overhead of starting a transaction for each record.

Filtering is temporarily suspended for a sequence of changes (set to 100
changes) when an unfilterable change is encountered. This strategy minimizes
the overhead of hash searching for every record, which is beneficial when the
majority of tables in an instance are published and thus unfilterable. The
threshold of 100 was determined to be the optimal balance based on performance
tests.

Additionally, filtering is paused for transactions containing WAL records
(INTERNAL_SNAPSHOT, COMMAND_ID, or INVALIDATION) that modify the historical
snapshot constructed during logical decoding. This pause is necessary because
constructing a correct historical snapshot for searching publication
information requires processing these WAL records.

Note that this patch filters changes only for system catalog relations,
non-logically logged relations, or temporary heaps and sequences. A subsequent
patch will introduce a new output plugin in pgoutput, which will further filter
changes for relations not included in publications.
---
 src/backend/replication/logical/decode.c        |   5 +
 src/backend/replication/logical/reorderbuffer.c | 467 ++++++++++++++++++++----
 src/include/replication/reorderbuffer.h         |  18 +
 src/tools/pgindent/typedefs.list                |   2 +
 4 files changed, 423 insertions(+), 69 deletions(-)

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 24d88f3..d8e0114 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -915,6 +915,11 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
 
+	if (ctx->reorder->can_filter_change &&
+		ReorderBufferFilterByRelFileLocator(ctx->reorder, XLogRecGetXid(r),
+											buf->origptr, &target_locator))
+		return;
+
 	change = ReorderBufferGetChange(ctx->reorder);
 	if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
 		change->action = REORDER_BUFFER_CHANGE_INSERT;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 5186ad2..5c30cf0 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -82,6 +82,32 @@
  *	  a bit more memory to the oldest subtransactions, because it's likely
  *	  they are the source for the next sequence of changes.
  *
+ *	  We also try to filter changes at the decode stage rather than at
+ *	  streaming time, which allows the system to skip processing txns that
+ *	  do not contain tables that need to be decoded. As a result,
+ *	  this can prevent the serialization of unnecessary records to disk
+ *	  during non-streaming mode, especially in large transactions, and also
+ *	  reduce memory and CPU usage in streaming mode when many changes can be
+ *	  filtered out.
+ *
+ *	  A hash cache of relation file locators is implemented to cache whether a
+ *	  relation is filterable or not. This ensures that we only need to retrieve
+ *	  relation and publication information from the catalog when a cache entry is
+ *	  invalid, avoiding the overhead of starting a transaction for each record.
+ *
+ *	  Filtering is temporarily suspended for a sequence of changes (set to 100
+ *	  changes) when an unfilterable change is encountered. This strategy minimizes
+ *	  the overhead of hash searching for every record, which is beneficial when the
+ *	  majority of tables in an instance are published and thus unfilterable. The
+ *	  threshold of 100 was determined to be the optimal balance based on performance
+ *	  tests.
+ *
+ *	  Additionally, filtering is paused for transactions containing WAL records
+ *	  (INTERNAL_SNAPSHOT, COMMAND_ID, or INVALIDATION) that modify the historical
+ *	  snapshot constructed during logical decoding. This pause is necessary because
+ *	  constructing a correct historical snapshot for searching publication
+ *	  information requires processing these WAL records.
+ *
  * -------------------------------------------------------------------------
  */
 #include "postgres.h"
@@ -109,6 +135,7 @@
 #include "storage/procarray.h"
 #include "storage/sinval.h"
 #include "utils/builtins.h"
+#include "utils/inval.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
@@ -227,8 +254,10 @@ static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
 											   XLogRecPtr lsn, bool create_as_top);
 static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
 											  ReorderBufferTXN *subtxn);
-
 static void AssertTXNLsnOrder(ReorderBuffer *rb);
+static Relation GetDecodableRelation(ReorderBuffer *rb,
+										 RelFileLocator *rlocator,
+										 bool has_tuple);
 
 /* ---------------------------------------
  * support functions for lsn-order iterating over the ->changes of a
@@ -279,6 +308,8 @@ static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
  */
 static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
+static Snapshot ReorderBufferStreamTXNSnapshot(ReorderBuffer *rb,
+											   ReorderBufferTXN *txn);
 static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
 
@@ -305,6 +336,48 @@ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											bool addition, Size sz);
 
 /*
+ * ---------------------------------------
+ * RelFileLocator filtering
+ * ---------------------------------------
+ * This hash table serves as a lookup table for determining if a relation can
+ * be filtered before being decoded and queued into the buffer.
+ *
+ * The hash table shares the same lifespan as the reorder buffer. This is
+ * crucial because each reorderbuffer may have different configurations or be
+ * associated with different output plugins, affecting the types of tables to
+ * be processed.
+ */
+
+static HTAB *RelFileLocatorFilterCache = NULL;
+
+static bool relation_callbacks_registered = false;
+
+typedef struct ReorderBufferRelFileLocatorKey
+{
+	Oid			reltablespace;
+	RelFileNumber relfilenumber;
+} ReorderBufferRelFileLocatorKey;
+
+/* Hash table entry used to determine if the relation can be filtered. */
+typedef struct ReorderBufferRelFileLocatorEnt
+{
+	ReorderBufferRelFileLocatorKey key;
+	Oid			relid;
+	bool		filterable;
+} ReorderBufferRelFileLocatorEnt;
+
+static void RelFileLocatorCacheInvalidateCallback(Datum arg, Oid relid);
+static void ReorderBufferMemoryResetcallback(void *arg);
+
+/*
+ * After encountering a change that cannot be filtered out, filtering is
+ * temporarily suspended. Filtering resumes after processing every 100 changes.
+ * This strategy helps to minimize the overhead of performing a hash table
+ * search for each record, especially when most changes are not filterable.
+ */
+#define CHANGES_THRESHOLD_FOR_FILTER 100
+
+/*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
  * prior ReorderBuffer instances for the same slot.
  */
@@ -362,6 +435,34 @@ ReorderBufferAllocate(void)
 	buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
 								 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
 
+	/*
+	 * To support early filtering of changes, this hash table acts
+	 * as a lookup table to determine if the corresponding relation is
+	 * required to be decoded and queued into the buffer. The hash table
+	 * shares the same lifespan as the reorder buffer.
+	 *
+	 * Also setup the callback to invalidate cache when relations are updated.
+	 */
+	hash_ctl.keysize = sizeof(ReorderBufferRelFileLocatorKey);
+	hash_ctl.entrysize = sizeof(ReorderBufferRelFileLocatorEnt);
+	hash_ctl.hcxt = buffer->context;
+
+	RelFileLocatorFilterCache =
+		hash_create("RelFileLocatorFilterCache", 1000, &hash_ctl,
+					HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+
+	buffer->relfile_callback.arg = NULL;
+	buffer->relfile_callback.func = ReorderBufferMemoryResetcallback;
+	MemoryContextRegisterResetCallback(buffer->context, &buffer->relfile_callback);
+
+	if (!relation_callbacks_registered)
+	{
+		/* Watch for invalidation events. */
+		CacheRegisterRelcacheCallback(RelFileLocatorCacheInvalidateCallback,
+									  (Datum) 0);
+		relation_callbacks_registered = true;
+	}
+
 	buffer->by_txn_last_xid = InvalidTransactionId;
 	buffer->by_txn_last_txn = NULL;
 
@@ -372,6 +473,8 @@ ReorderBufferAllocate(void)
 	/* txn_heap is ordered by transaction size */
 	buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL);
 
+	buffer->can_filter_change = true;
+
 	buffer->spillTxns = 0;
 	buffer->spillCount = 0;
 	buffer->spillBytes = 0;
@@ -826,6 +929,14 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 
 		toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
 	}
+	else if (change->action == REORDER_BUFFER_CHANGE_INVALIDATION ||
+			 change->action == REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT ||
+			 change->action == REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID)
+	{
+		ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
+
+		toptxn->txn_flags |= RBTXN_HAS_SNAPSHOT_CHANGES;
+	}
 
 	change->lsn = lsn;
 	change->txn = txn;
@@ -835,6 +946,17 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	txn->nentries++;
 	txn->nentries_mem++;
 
+	/*
+	 * If filtering is currently suspended and we've crossed the change threshold,
+	 * attempt to filter again
+	 */
+	if (!rb->can_filter_change && (++rb->unfiltered_changes_count
+									>= CHANGES_THRESHOLD_FOR_FILTER))
+	{
+		rb->can_filter_change = true;
+		rb->unfiltered_changes_count = 0;
+	}
+
 	/* update memory accounting information */
 	ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
 									ReorderBufferChangeSize(change));
@@ -1734,6 +1856,9 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
 		txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
 	}
 
+	/* All snapshot changes up to this point have been processed. */
+	txn->txn_flags &= ~RBTXN_HAS_SNAPSHOT_CHANGES;
+
 	/* also reset the number of entries in the transaction */
 	txn->nentries_mem = 0;
 	txn->nentries = 0;
@@ -2177,6 +2302,84 @@ ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 }
 
 /*
+ * Return the relation corresponding to the given RelFileLocator 'rlocator' if
+ * relevant to decoding. Otherwise, return NULL.
+ *
+ * We don't decode catalog files, relations that are not logically logged,
+ * temporary heaps and sequences.
+ */
+static Relation
+GetDecodableRelation(ReorderBuffer *rb, RelFileLocator *rlocator,
+						 bool has_tuple)
+{
+	bool		filterable = false;
+	Relation	relation;
+	Oid			reloid;
+	RelPathStr  path;
+
+	reloid = RelidByRelfilenumber(rlocator->spcOid, rlocator->relNumber);
+	path = relpathperm(*rlocator, MAIN_FORKNUM);
+
+	if (!OidIsValid(reloid))
+	{
+		if (!has_tuple)
+		{
+			/*
+			 * Mapped catalog tuple without data, emitted while catalog table was in
+			 * the process of being rewritten. We can fail to look up the
+			 * relfilenumber, because the relmapper has no "historic" view, in
+			 * contrast to the normal catalog during decoding. Thus repeated rewrites
+			 * can cause a lookup failure. That's OK because we do not decode catalog
+			 * changes anyway. Normally such tuples would be skipped over below, but
+			 * we can't identify whether the table should be logically logged without
+			 * mapping the relfilenumber to the oid.
+			 */
+			return NULL;
+		}
+
+		elog(ERROR, "could not map filenumber \"%s\" to relation OID",
+				path.str);
+	}
+
+	relation = RelationIdGetRelation(reloid);
+
+	if (!RelationIsValid(relation))
+		elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
+			 reloid, path.str);
+
+	if (!RelationIsLogicallyLogged(relation))
+		filterable = true;
+
+	else if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+	{
+		/*
+		 * Ignore temporary heaps created during DDL unless the plugin has asked
+		 * for them.
+		 */
+		filterable = true;
+	}
+
+	else if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+	{
+		/*
+		 * For now ignore sequence changes entirely. Most of the time they don't
+		 * log changes using records we understand, so it doesn't make sense to
+		 * handle the few cases we do.
+		 */
+		filterable = true;
+	}
+
+	/* Return NULL to indicate that this relation need not be decoded. */
+	if (filterable)
+	{
+		RelationClose(relation);
+		return NULL;
+	}
+
+	return relation;
+}
+
+/*
  * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
  *
  * Send data of a transaction (and its subtransactions) to the
@@ -2248,7 +2451,6 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
 		{
 			Relation	relation = NULL;
-			Oid			reloid;
 
 			CHECK_FOR_INTERRUPTS();
 
@@ -2307,55 +2509,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_DELETE:
 					Assert(snapshot_now);
 
-					reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
-												  change->data.tp.rlocator.relNumber);
-
-					/*
-					 * Mapped catalog tuple without data, emitted while
-					 * catalog table was in the process of being rewritten. We
-					 * can fail to look up the relfilenumber, because the
-					 * relmapper has no "historic" view, in contrast to the
-					 * normal catalog during decoding. Thus repeated rewrites
-					 * can cause a lookup failure. That's OK because we do not
-					 * decode catalog changes anyway. Normally such tuples
-					 * would be skipped over below, but we can't identify
-					 * whether the table should be logically logged without
-					 * mapping the relfilenumber to the oid.
-					 */
-					if (reloid == InvalidOid &&
-						change->data.tp.newtuple == NULL &&
-						change->data.tp.oldtuple == NULL)
-						goto change_done;
-					else if (reloid == InvalidOid)
-						elog(ERROR, "could not map filenumber \"%s\" to relation OID",
-							 relpathperm(change->data.tp.rlocator,
-										 MAIN_FORKNUM).str);
-
-					relation = RelationIdGetRelation(reloid);
-
+					relation = GetDecodableRelation(rb, &change->data.tp.rlocator,
+														(change->data.tp.newtuple != NULL ||
+														 change->data.tp.oldtuple != NULL));
 					if (!RelationIsValid(relation))
-						elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
-							 reloid,
-							 relpathperm(change->data.tp.rlocator,
-										 MAIN_FORKNUM).str);
-
-					if (!RelationIsLogicallyLogged(relation))
-						goto change_done;
-
-					/*
-					 * Ignore temporary heaps created during DDL unless the
-					 * plugin has asked for them.
-					 */
-					if (relation->rd_rel->relrewrite && !rb->output_rewrites)
-						goto change_done;
-
-					/*
-					 * For now ignore sequence changes entirely. Most of the
-					 * time they don't log changes using records we
-					 * understand, so it doesn't make sense to handle the few
-					 * cases we do.
-					 */
-					if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
 						goto change_done;
 
 					/* user-triggered change */
@@ -4171,19 +4328,13 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 }
 
 /*
- * Send data of a large transaction (and its subtransactions) to the
- * output plugin, but using the stream API.
+ * This function generates or retrieves a consistent snapshot of a transaction
+ * that is currently in progress for logical replication.
  */
-static void
-ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+static Snapshot
+ReorderBufferStreamTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
 	Snapshot	snapshot_now;
-	CommandId	command_id;
-	Size		stream_bytes;
-	bool		txn_is_streamed;
-
-	/* We can never reach here for a subtransaction. */
-	Assert(rbtxn_is_toptxn(txn));
 
 	/*
 	 * We can't make any assumptions about base snapshot here, similar to what
@@ -4226,12 +4377,11 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		if (txn->base_snapshot == NULL)
 		{
 			Assert(txn->ninvalidations == 0);
-			return;
+			return NULL;
 		}
 
-		command_id = FirstCommandId;
 		snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
-											 txn, command_id);
+											 txn, FirstCommandId);
 	}
 	else
 	{
@@ -4244,17 +4394,40 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		 * the LSN condition in the previous branch (so no need to walk
 		 * through subxacts again). In fact, we must not do that as we may be
 		 * using snapshot half-way through the subxact.
-		 */
-		command_id = txn->command_id;
-
-		/*
+		 *
 		 * We can't use txn->snapshot_now directly because after the last
 		 * streaming run, we might have got some new sub-transactions. So we
 		 * need to add them to the snapshot.
 		 */
 		snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
-											 txn, command_id);
+											 txn, txn->command_id);
+	}
+
+	return snapshot_now;
+}
+
+/*
+ * Send data of a large transaction (and its subtransactions) to the
+ * output plugin, but using the stream API.
+ */
+static void
+ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	Snapshot	snapshot_now;
+	Size		stream_bytes;
+	bool		txn_is_streamed;
+
+	/* We can never reach here for a subtransaction. */
+	Assert(rbtxn_is_toptxn(txn));
+
+	snapshot_now = ReorderBufferStreamTXNSnapshot(rb, txn);
+
+	/* This transaction didn't make any changes to the database till now. */
+	if (snapshot_now == NULL)
+		return;
 
+	if (txn->snapshot_now != NULL)
+	{
 		/* Free the previously copied snapshot. */
 		Assert(txn->snapshot_now->copied);
 		ReorderBufferFreeSnap(rb, txn->snapshot_now);
@@ -4272,7 +4445,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
 	/* Process and send the changes to output plugin. */
 	ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
-							command_id, true);
+							snapshot_now->curcid, true);
 
 	rb->streamCount += 1;
 	rb->streamBytes += stream_bytes;
@@ -5461,3 +5634,159 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Flush mapping entries when the corresponding relations are updated in pg_class.
+ * If relid is InvalidOid then do a complete reset of the RelFileLocatorFilterCache.
+ */
+static void
+RelFileLocatorCacheInvalidateCallback(Datum arg, Oid relid)
+{
+	HASH_SEQ_STATUS status;
+	ReorderBufferRelFileLocatorEnt *entry;
+
+	if (!RelFileLocatorFilterCache)
+		return;
+
+	hash_seq_init(&status, RelFileLocatorFilterCache);
+
+	/* Slightly inefficient algorithm but not performance critical path */
+	while ((entry = (ReorderBufferRelFileLocatorEnt *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * If a complete reset is requested (when relid parameter is InvalidOid),
+		 * we must remove all entries, otherwise just remove the specific relation's
+		 * entry. Remove any invalid cache entries (these are indicated by invalid
+		 * entry->relid)
+		 */
+		if (relid == InvalidOid ||	/* complete reset */
+			entry->relid == InvalidOid ||	/* invalid cache entry */
+			entry->relid == relid)	/* individual flushed relation */
+		{
+			if (hash_search(RelFileLocatorFilterCache,
+							&entry->key,
+							HASH_REMOVE,
+							NULL) == NULL)
+				elog(ERROR, "hash table corrupted");
+		}
+	}
+}
+
+/*
+ * Context reset/delete callback for RelFileLocatorFilterCache.
+ */
+static void
+ReorderBufferMemoryResetcallback(void *arg)
+{
+	RelFileLocatorFilterCache = NULL;
+}
+
+/*
+ * Determine whether the record corresponding to the relation identified by
+ * 'rlocator' needs to be filtered and not decoded and queued in the buffer.
+ *
+ * Determining the necessity of decoding requires accessing relation
+ * information from the system catalog, which requires a historical snapshot.
+ * We cannot directly use the current snapshot of the transaction due to the
+ * presence of INTERNAL_SNAPSHOT, COMMAND_ID, or INVALIDATION records in the
+ * buffer that could modify the snapshot. A proper snapshot can only be
+ * constructed after these records are processed in ReorderBufferProcessTXN, or
+ * if we are decoding a transaction without these records. See comment on top
+ * of GetDecodableRelation() to see list of relations that are not
+ * decoded by the reorderbuffer.
+ *
+ * To reduce the overhead from the system catalog access and transaction
+ * management, the results are cached in the 'RelFileLocatorFilterCache' hash
+ * table.
+ *
+ * Returns true if the relation can be filtered; otherwise, false.
+ */
+bool
+ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+									XLogRecPtr lsn, RelFileLocator *rlocator)
+
+{
+	bool		found;
+	Relation	relation;
+	bool		using_subtxn;
+	Snapshot	snapshot_now;
+	ReorderBufferTXN *txn,
+			   *toptxn;
+	ReorderBufferRelFileLocatorEnt *entry;
+	ReorderBufferRelFileLocatorKey key;
+
+	Assert(RelFileLocatorFilterCache);
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+	Assert(txn);
+	toptxn = rbtxn_get_toptxn(txn);
+	rb->can_filter_change = false;
+
+	/*
+	 * We cannot construct an accurate historical snapshot until all pending
+	 * records in this transaction that might update the snapshot are
+	 * processed.
+	 */
+	if (rbtxn_has_snapshot_changes(toptxn))
+		return false;
+
+	key.reltablespace = rlocator->spcOid;
+	key.relfilenumber = rlocator->relNumber;
+	entry = hash_search(RelFileLocatorFilterCache, &key, HASH_ENTER, &found);
+
+	if (found)
+	{
+		rb->can_filter_change = entry->filterable;
+		return entry->filterable;
+	}
+
+	/* constructs a temporary historical snapshot */
+	snapshot_now = ReorderBufferStreamTXNSnapshot(rb, toptxn);
+	Assert(snapshot_now);
+
+	/* build data to be able to lookup the CommandIds of catalog tuples */
+	ReorderBufferBuildTupleCidHash(rb, toptxn);
+
+	/* setup the initial snapshot */
+	SetupHistoricSnapshot(snapshot_now, toptxn->tuplecid_hash);
+
+	using_subtxn = IsTransactionOrTransactionBlock();
+
+	if (using_subtxn)
+		BeginInternalSubTransaction("filter change by RelFileLocator");
+	else
+		StartTransactionCommand();
+
+	relation = GetDecodableRelation(rb, rlocator, true);
+
+	if (RelationIsValid(relation))
+	{
+		entry->relid = RelationGetRelid(relation);
+		entry->filterable = false;
+		RelationClose(relation);
+	}
+	else
+	{
+		entry->relid = InvalidOid;
+		entry->filterable = true;
+	}
+
+	rb->can_filter_change = entry->filterable;
+
+	ReorderBufferFreeSnap(rb, snapshot_now);
+
+	TeardownHistoricSnapshot(false);
+
+	/*
+	 * Aborting the current (sub-)transaction as a whole has the right
+	 * semantics. We want all locks acquired in here to be released, not
+	 * reassigned to the parent and we do not want any database access have
+	 * persistent effects.
+	 */
+	AbortCurrentTransaction();
+
+	if (using_subtxn)
+		RollbackAndReleaseCurrentSubTransaction();
+
+	return entry->filterable;
+}
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 517a8e3..24166c2 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -176,6 +176,7 @@ typedef struct ReorderBufferChange
 #define RBTXN_SENT_PREPARE			0x0200
 #define RBTXN_IS_COMMITTED			0x0400
 #define RBTXN_IS_ABORTED			0x0800
+#define RBTXN_HAS_SNAPSHOT_CHANGES  0x1000
 
 #define RBTXN_PREPARE_STATUS_MASK	(RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE | RBTXN_SENT_PREPARE)
 
@@ -215,6 +216,12 @@ typedef struct ReorderBufferChange
 	((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
 )
 
+/* Does this transaction make changes to the current snapshot? */
+#define rbtxn_has_snapshot_changes(txn) \
+( \
+	((txn)->txn_flags & RBTXN_HAS_SNAPSHOT_CHANGES) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *
@@ -641,6 +648,7 @@ struct ReorderBuffer
 	 * Private memory context.
 	 */
 	MemoryContext context;
+	MemoryContextCallback relfile_callback;
 
 	/*
 	 * Memory contexts for specific types objects
@@ -661,6 +669,12 @@ struct ReorderBuffer
 	/* Max-heap for sizes of all top-level and sub transactions */
 	pairingheap *txn_heap;
 
+	/* should we try to filter the change? */
+	bool		can_filter_change;
+
+	/* number of changes after a failed attempt at filtering */
+	int8		unfiltered_changes_count;
+
 	/*
 	 * Statistics about transactions spilled to disk.
 	 *
@@ -761,4 +775,8 @@ extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 
 extern void StartupReorderBuffer(void);
 
+extern bool ReorderBufferFilterByRelFileLocator(ReorderBuffer *rb, TransactionId xid,
+												XLogRecPtr lsn, RelFileLocator *rlocator);
+extern bool ReorderBufferCanFilterChanges(ReorderBuffer *rb);
+
 #endif
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 56989aa..fc12839 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2449,6 +2449,8 @@ ReorderBufferIterTXNEntry
 ReorderBufferIterTXNState
 ReorderBufferMessageCB
 ReorderBufferPrepareCB
+ReorderBufferRelFileLocatorEnt
+ReorderBufferRelFileLocatorKey
 ReorderBufferRollbackPreparedCB
 ReorderBufferStreamAbortCB
 ReorderBufferStreamChangeCB
-- 
1.8.3.1

