From 1fdc49f2b8b16b4c39759cc5f8cd3e5a0d6bcd55 Mon Sep 17 00:00:00 2001
From: "adger.lj" <adger.lj@alibaba-inc.com>
Date: Wed, 6 Mar 2024 15:21:17 +0800
Subject: [PATCH] Reduce useless changes before reassembly during logical
 replication

In order to reduce unnecessary logical replication, irrelevant relationship
changes can be filtered out before reorganizing transaction fragments.
This can effectively reduce useless changes and prevent storage space from
being filled up with irrelevant data. In addition, even though the stream mode
is used, this patch can effectively reduce cpu and memory consumption.

By design, Added a callback LogicalDecodeFilterByRelCB for the output plugin.
We implemented a function pgoutput_table_filter for pgoutput. And RelationSyncCache
is reused to determine whether a relationship-related change should be filtered out.
referring to the implementation of the function pgoutput_change, currently only
insert/update/delete is can filtered, and other types of changes are not considered.
Perhaps more detailed analysis can be done and more filters can be filtered.

In decoding, a function ReorderBufferFilterByLocator is added, which can decide
whether to add to the change list of transactions according to the RelFileLocator
in the wal record. In order to implement this filtering, we need to find its relation
according to the RelFileLocator, and then judge whether its dml has been published.
This process requires opening transactions and using snapshots and accessing caches.
In order to avoid overhead, a new cache has to be introduced to cache RelFileLocator
to PublicationActions. In this way, additional overhead will only be incurred when
obtaining PublicationActions for a new RelFileLocator, and in most cases, it can be
taken directly from the LocatorCache.
---
 src/backend/replication/logical/decode.c      |  17 +-
 src/backend/replication/logical/logical.c     |  28 ++
 .../replication/logical/reorderbuffer.c       | 366 +++++++++++++++++-
 src/backend/replication/pgoutput/pgoutput.c   |  28 +-
 src/include/replication/logical.h             |   3 +
 src/include/replication/output_plugin.h       |   9 +
 src/include/replication/reorderbuffer.h       |   1 +
 src/test/subscription/t/034_table_filter.pl   |  89 +++++
 src/tools/pgindent/typedefs.list              |   1 +
 9 files changed, 532 insertions(+), 10 deletions(-)
 create mode 100644 src/test/subscription/t/034_table_filter.pl

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index e5ab7b78b7..7e59356dc6 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -152,7 +152,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_PARAMETER_CHANGE:
 			{
 				xl_parameter_change *xlrec =
-					(xl_parameter_change *) XLogRecGetData(buf->record);
+				(xl_parameter_change *) XLogRecGetData(buf->record);
 
 				/*
 				 * If wal_level on the primary is reduced to less than
@@ -914,6 +914,11 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
+	/* only interested in our published tables. */
+	if (ReorderBufferFilterByLocator(ctx->reorder, XLogRecGetXid(r), &target_locator,
+									 REORDER_BUFFER_CHANGE_INSERT, buf->origptr))
+		return;
+
 	/* output plugin doesn't look for this origin, no need to queue */
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
@@ -964,6 +969,11 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
+	/* only interested in our published tables. */
+	if (ReorderBufferFilterByLocator(ctx->reorder, XLogRecGetXid(r), &target_locator,
+									 REORDER_BUFFER_CHANGE_UPDATE, buf->origptr))
+		return;
+
 	/* output plugin doesn't look for this origin, no need to queue */
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
@@ -1030,6 +1040,11 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (target_locator.dbOid != ctx->slot->data.database)
 		return;
 
+	/* only interested in our published tables. */
+	if (ReorderBufferFilterByLocator(ctx->reorder, XLogRecGetXid(r), &target_locator,
+									 REORDER_BUFFER_CHANGE_DELETE, buf->origptr))
+		return;
+
 	/* output plugin doesn't look for this origin, no need to queue */
 	if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
 		return;
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 51ffb623c0..9be22862e4 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1235,6 +1235,34 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+void
+filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation, PublicationActions *pubactions)
+{
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	Assert(!ctx->fast_forward);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_by_table";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->end_xact = false;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.filter_by_table_cb(ctx, relation, pubactions);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 001f901ee6..ab2b43d271 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -92,6 +92,7 @@
 #include "access/xlog_internal.h"
 #include "catalog/catalog.h"
 #include "common/int.h"
+#include "common/string.h"
 #include "lib/binaryheap.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -104,8 +105,10 @@
 #include "storage/sinval.h"
 #include "utils/builtins.h"
 #include "utils/memutils.h"
+#include "utils/inval.h"
 #include "utils/rel.h"
 #include "utils/relfilenumbermap.h"
+#include "utils/syscache.h"
 
 
 /* entry for a hash table we use to map from xid to our transaction state */
@@ -211,6 +214,14 @@ static const Size max_changes_in_memory = 4096; /* XXX for restore only */
 /* GUC variable */
 int			debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED;
 
+static HTAB *LocatorFilterCache = NULL;
+typedef struct LocatorFilterEntry
+{
+	RelFileLocator relfileocator;
+	Oid			relid;
+	PublicationActions pubactions;
+} LocatorFilterEntry;
+
 /* ---------------------------------------
  * primary reorderbuffer support routines
  * ---------------------------------------
@@ -273,6 +284,7 @@ static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
 static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
 static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static Snapshot ReorderBufferStreamTXNSnapShot(ReorderBuffer *rb, ReorderBufferTXN *txn);
 
 /* ---------------------------------------
  * toast reassembly support
@@ -295,6 +307,10 @@ static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
 											ReorderBufferChange *change,
 											bool addition, Size sz);
 
+static void init_locator_filter_cache(MemoryContext cachectx);
+static void locator_filter_invalidate_syscache_cb(Datum arg, int cacheid, uint32 hashvalue);
+static void locator_filter_invalidate_relcache_cb(Datum arg, Oid relid);
+
 /*
  * Allocate a new ReorderBuffer and clean out any old serialized state from
  * prior ReorderBuffer instances for the same slot.
@@ -1404,7 +1420,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 	{
 		dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
 		ReorderBufferChange *next_change =
-			dlist_container(ReorderBufferChange, node, next);
+		dlist_container(ReorderBufferChange, node, next);
 
 		/* txn stays the same */
 		state->entries[off].lsn = next_change->lsn;
@@ -1435,8 +1451,8 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
 		{
 			/* successfully restored changes from disk */
 			ReorderBufferChange *next_change =
-				dlist_head_element(ReorderBufferChange, node,
-								   &entry->txn->changes);
+			dlist_head_element(ReorderBufferChange, node,
+							   &entry->txn->changes);
 
 			elog(DEBUG2, "restored %u/%u changes from disk",
 				 (uint32) entry->txn->nentries_mem,
@@ -3837,7 +3853,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 			{
 				char	   *data;
 				Size		inval_size = sizeof(SharedInvalidationMessage) *
-					change->data.inval.ninvalidations;
+				change->data.inval.ninvalidations;
 
 				sz += inval_size;
 
@@ -3980,6 +3996,38 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
 	return false;
 }
 
+static Snapshot
+ReorderBufferStreamTXNSnapShot(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+	Snapshot	snapshot_now;
+	dlist_iter	subxact_i;
+
+	Assert(rbtxn_is_toptxn(txn));
+
+	/*
+	 * If this transaction has no snapshot, it didn't make any changes to the
+	 * database till now, so there's nothing to decode.
+	 */
+	if (txn->base_snapshot == NULL)
+	{
+		Assert(txn->ninvalidations == 0);
+		return NULL;
+	}
+
+	dlist_foreach(subxact_i, &txn->subtxns)
+	{
+		ReorderBufferTXN *subtxn;
+
+		subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
+		ReorderBufferTransferSnapToParent(txn, subtxn);
+	}
+
+	snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
+										 txn, txn->command_id);
+
+	return snapshot_now;
+}
+
 /*
  * Send data of a large transaction (and its subtransactions) to the
  * output plugin, but using the stream API.
@@ -4202,7 +4250,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	dlist_foreach_modify(cleanup_iter, &txn->changes)
 	{
 		ReorderBufferChange *cleanup =
-			dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
+		dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
 
 		dlist_delete(&cleanup->node);
 		ReorderBufferReturnChange(rb, cleanup, true);
@@ -4427,7 +4475,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INVALIDATION:
 			{
 				Size		inval_size = sizeof(SharedInvalidationMessage) *
-					change->data.inval.ninvalidations;
+				change->data.inval.ninvalidations;
 
 				change->data.inval.invalidations =
 					MemoryContextAlloc(rb->context, inval_size);
@@ -4932,7 +4980,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		dlist_foreach_modify(it, &ent->chunks)
 		{
 			ReorderBufferChange *change =
-				dlist_container(ReorderBufferChange, node, it.cur);
+			dlist_container(ReorderBufferChange, node, it.cur);
 
 			dlist_delete(&change->node);
 			ReorderBufferReturnChange(rb, change, true);
@@ -5270,3 +5318,307 @@ restart:
 		*cmax = ent->cmax;
 	return true;
 }
+
+/*
+ * Determine whether the record needs to be added to the transaction
+ * change list based on the RelFileLocator.
+ * In order to avoid excessive overhead, we use caching, that is, we need
+ * to construct information about whether the RelFileLocator is published
+ * during the first access or after the cache invalidated.
+ * A Historical Snapshot is required here. Although the current transaction
+ * may not be completely reorganized, the base snapshot can still be used for
+ * gradual iteration, just like the snapshot is processed in the ReorderBufferStreamTXN
+ * function, but we're just read-only here.
+ */
+bool
+ReorderBufferFilterByLocator(ReorderBuffer *rb, TransactionId xid, RelFileLocator *relfileocator, ReorderBufferChangeType action, XLogRecPtr lsn)
+{
+	LogicalDecodingContext *ctx = rb->private_data;
+	LocatorFilterEntry *entry;
+	bool		found;
+	Relation	relation = NULL;
+	Oid			reloid = InvalidOid;
+	bool		using_subtxn;
+	bool		filter = false;
+	Snapshot	snapshot_now = NULL;
+	ReorderBufferTXN *txn,
+			   *toptxn;
+
+
+	txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+	toptxn = rbtxn_get_toptxn(txn);
+
+	if (ctx->callbacks.filter_by_table_cb == NULL)
+		return false;
+
+	/*
+	 * If you don't filter it before reaching the restart lsn, let the
+	 * subsequent processing it.
+	 */
+	if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, lsn) || ctx->fast_forward)
+		return false;
+
+	if (LocatorFilterCache == NULL)
+		init_locator_filter_cache(CacheMemoryContext);
+
+	/* Find cached relation info, creating if not found */
+	entry = (LocatorFilterEntry *) hash_search(LocatorFilterCache,
+											   relfileocator,
+											   HASH_ENTER, &found);
+	Assert(entry != NULL);
+	if (found)
+		goto filter_done;
+
+	entry->pubactions.pubdelete = entry->pubactions.pubinsert
+		= entry->pubactions.pubtruncate = entry->pubactions.pubupdate = false;
+
+	/* Constructs a temporary historical snapshot. */
+	snapshot_now = ReorderBufferStreamTXNSnapShot(rb, toptxn);
+
+	if (snapshot_now == NULL)
+		return false;
+
+	/* build data to be able to lookup the CommandIds of catalog tuples */
+	ReorderBufferBuildTupleCidHash(rb, toptxn);
+
+	/* setup the initial snapshot */
+	SetupHistoricSnapshot(snapshot_now, toptxn->tuplecid_hash);
+
+	entry->relid = InvalidOid;
+	using_subtxn = IsTransactionOrTransactionBlock();
+
+	if (using_subtxn)
+		BeginInternalSubTransaction("filter change by table");
+	else
+		StartTransactionCommand();
+
+	reloid = RelidByRelfilenumber(relfileocator->spcOid, relfileocator->relNumber);
+	if (reloid == InvalidOid)
+		goto init_cache_done;
+
+	relation = RelationIdGetRelation(reloid);
+
+	if (!RelationIsValid(relation))
+		elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
+			 reloid,
+			 relpathperm(*relfileocator,
+						 MAIN_FORKNUM));
+
+	if (!RelationIsLogicallyLogged(relation))
+		goto init_cache_done;
+
+	/*
+	 * Ignore temporary heaps created during DDL unless the plugin has asked
+	 * for them.
+	 */
+	if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+		goto init_cache_done;
+
+	/*
+	 * For now ignore sequence changes entirely. Most of the time they don't
+	 * log changes using records we understand, so it doesn't make sense to
+	 * handle the few cases we do.
+	 */
+	if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+		goto init_cache_done;
+
+	if (IsToastRelation(relation))
+	{
+		Oid			real_reloid = InvalidOid;
+
+		/* pg_toast_ len is 9 */
+		char	   *toast_name = RelationGetRelationName(relation);
+		char	   *start_ch = &toast_name[9];
+
+		real_reloid = strtoint(start_ch, NULL, 10);
+
+		if (real_reloid == InvalidOid)
+			elog(ERROR, "cannot get the real table oid for toast table %s, error: %m", toast_name);
+
+		RelationClose(relation);
+
+		relation = RelationIdGetRelation(real_reloid);
+
+		if (!RelationIsValid(relation))
+			elog(ERROR, "could not open real relation with OID %u (for toast table filenumber \"%s\")",
+				 reloid,
+				 relpathperm(*relfileocator,
+							 MAIN_FORKNUM));
+	}
+
+	filter_by_table_cb_wrapper(ctx, relation, &entry->pubactions);
+	entry->relid = reloid;
+
+init_cache_done:
+
+	if (RelationIsValid(relation))
+		RelationClose(relation);
+
+	AbortCurrentTransaction();
+
+	if (using_subtxn)
+		RollbackAndReleaseCurrentSubTransaction();
+
+	TeardownHistoricSnapshot(false);
+	pfree(snapshot_now);
+
+filter_done:
+	/* check the table filter */
+	switch (action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			if (!entry->pubactions.pubinsert)
+				filter = true;
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			if (!entry->pubactions.pubupdate)
+				filter = true;
+			break;
+		case REORDER_BUFFER_CHANGE_DELETE:
+			if (!entry->pubactions.pubdelete)
+				filter = true;
+			break;
+		default:
+			filter = true;
+	}
+
+	if (filter)
+		elog(DEBUG1, "logical filter change by table %u", entry->relid);
+
+	return filter;
+}
+
+
+/*
+ * Initialize the locator filter cache for a decoding session.
+ *
+ * The hash table is destroyed at the end of a decoding session.
+ * After the relcache invalidated, the corresponding LocatorFilterEntry also needs to be recalculated.
+ * After these three syscache(NAMESPACEOID, PUBLICATIONRELMAP, PUBLICATIONNAMESPACEMAP) invalidated,
+ * all locator cache need to be invalidated. Just like RelationSyncCache.
+ */
+static void
+init_locator_filter_cache(MemoryContext cachectx)
+{
+	HASHCTL		ctl;
+	static bool relfile_callbacks_registered = false;
+
+	/* Nothing to do if hash table already exists */
+	if (LocatorFilterCache != NULL)
+		return;
+
+	/* Make a new hash table for the cache */
+	ctl.keysize = sizeof(RelFileLocator);
+	ctl.entrysize = sizeof(LocatorFilterEntry);
+	ctl.hcxt = cachectx;
+
+	LocatorFilterCache = hash_create("logical replication output RelFile cache",
+									 128, &ctl,
+									 HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
+
+	Assert(LocatorFilterCache != NULL);
+
+	/* No more to do if we already registered callbacks */
+	if (relfile_callbacks_registered)
+		return;
+
+	/* We must update the cache entry for a relation after a relcache flush */
+	CacheRegisterRelcacheCallback(locator_filter_invalidate_relcache_cb, (Datum) 0);
+
+	/*
+	 * Flush all cache entries after a pg_namespace change, in case it was a
+	 * schema rename affecting a relation being replicated.
+	 */
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  locator_filter_invalidate_syscache_cb,
+								  (Datum) 0);
+
+	/*
+	 * Flush all cache entries after any publication changes.  (We need no
+	 * callback entry for pg_publication, because publication_invalidation_cb
+	 * will take care of it.)
+	 */
+	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
+								  locator_filter_invalidate_syscache_cb,
+								  (Datum) 0);
+	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
+								  locator_filter_invalidate_syscache_cb,
+								  (Datum) 0);
+
+	relfile_callbacks_registered = true;
+}
+
+/*
+ * Relation locator map syscache invalidation callback
+ *
+ * Called for invalidations on pg_publication, pg_publication_rel,
+ * pg_publication_namespace, and pg_namespace.
+ */
+static void
+locator_filter_invalidate_syscache_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	LocatorFilterEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * LocatorFilterCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the invalidation callbacks.
+	 */
+	if (LocatorFilterCache == NULL)
+		return;
+
+	/*
+	 * We have no easy way to identify which cache entries this invalidation
+	 * event might have affected, so just mark them all invalid.
+	 */
+	hash_seq_init(&status, LocatorFilterCache);
+	while ((entry = (LocatorFilterEntry *) hash_seq_search(&status)) != NULL)
+	{
+		if (hash_search(LocatorFilterCache,
+						&entry->relfileocator,
+						HASH_REMOVE,
+						NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
+
+/*
+ * Flush mapping entries when pg_class is updated in a relevant fashion.
+ */
+static void
+locator_filter_invalidate_relcache_cb(Datum arg, Oid relid)
+{
+	LocatorFilterEntry *entry;
+
+	HASH_SEQ_STATUS status;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * LocatorFilterCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the invalidation callbacks.
+	 */
+	if (LocatorFilterCache == NULL)
+		return;
+
+	/*
+	 * If relid is InvalidOid, signaling a complete reset, we must remove all
+	 * entries, otherwise just remove the specific relation's entry. Always
+	 * remove negative cache entries.
+	 */
+	hash_seq_init(&status, LocatorFilterCache);
+	while ((entry = (LocatorFilterEntry *) hash_seq_search(&status)) != NULL)
+	{
+		if (relid == InvalidOid ||	/* complete reset */
+			entry->relid == InvalidOid ||	/* negative cache entry */
+			entry->relid == relid)	/* individual flushed relation */
+		{
+			if (hash_search(LocatorFilterCache,
+							&entry->relfileocator,
+							HASH_REMOVE,
+							NULL) == NULL)
+				elog(ERROR, "hash table corrupted");
+		}
+	}
+}
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index d2b35cfb96..8948fc56d7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -56,6 +56,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static void pgoutput_table_filter(LogicalDecodingContext *ctx,
+								  Relation relation,
+								  PublicationActions *pubactions);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -258,6 +261,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_by_table_cb = pgoutput_table_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -1414,8 +1418,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
-	if (!is_publishable_relation(relation))
-		return;
+	Assert(is_publishable_relation(relation));
 
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
@@ -1683,6 +1686,27 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+/*
+ * Return pubactions for relation
+ */
+static void
+pgoutput_table_filter(LogicalDecodingContext *ctx,
+					  Relation relation,
+					  PublicationActions *pubactions)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	RelationSyncEntry *relentry;
+
+	if (!is_publishable_relation(relation))
+		return;
+
+	relentry = get_rel_sync_entry(data, relation);
+
+	if (pubactions)
+		*pubactions = relentry->pubactions;
+}
+
+
 /*
  * Shutdown the output plugin.
  *
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dc2df4ce92..f6beecfe49 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -145,6 +145,9 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern void filter_by_table_cb_wrapper(LogicalDecodingContext *ctx,
+										Relation relation,
+										PublicationActions *pubactions);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 44988ebdd8..7b051b47eb 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -10,6 +10,7 @@
 #define OUTPUT_PLUGIN_H
 
 #include "replication/reorderbuffer.h"
+#include "catalog/pg_publication.h"
 
 struct LogicalDecodingContext;
 struct OutputPluginCallbacks;
@@ -96,6 +97,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
 											   RepOriginId origin_id);
 
+/*
+ * Filter changes by table.
+ */
+typedef void (*LogicalDecodeFilterByRelCB) (struct LogicalDecodingContext *ctx,
+											Relation relation,
+											PublicationActions *pubactions);
+
 /*
  * Called to shutdown an output plugin.
  */
@@ -222,6 +230,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterByRelCB filter_by_table_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 0b2c95f7aa..f1dd145f3a 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -730,5 +730,6 @@ extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
 extern void ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr);
 
 extern void StartupReorderBuffer(void);
+extern bool ReorderBufferFilterByLocator(ReorderBuffer *rb, TransactionId xid, RelFileLocator *relfileocator, ReorderBufferChangeType action, XLogRecPtr lsn);
 
 #endif
diff --git a/src/test/subscription/t/034_table_filter.pl b/src/test/subscription/t/034_table_filter.pl
new file mode 100644
index 0000000000..0b329645c2
--- /dev/null
+++ b/src/test/subscription/t/034_table_filter.pl
@@ -0,0 +1,89 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->append_conf('postgresql.conf', 'log_min_messages = DEBUG1');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"create table tbl_pub(id int, val1 text, val2 text,size text);");
+$node_publisher->safe_psql('postgres',
+	"create table tbl_t1(id int, val1 text, val2 text,size text);");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION mypub FOR TABLE public.tbl_pub;");
+$node_publisher->safe_psql('postgres',
+qq(
+CREATE OR REPLACE FUNCTION check_replication_status() RETURNS VOID AS \$\$
+DECLARE
+    replication_record pg_stat_replication;
+BEGIN
+    LOOP
+        SELECT *
+        INTO replication_record
+        FROM pg_stat_replication
+        WHERE application_name = 'mysub';
+        
+        IF replication_record.replay_lsn = replication_record.write_lsn THEN
+            EXIT;
+        END IF;
+    
+        PERFORM pg_sleep(1);
+    END LOOP;
+END;
+\$\$ LANGUAGE plpgsql;));
+
+# Create some preexisting content on subscriber
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres', 
+    "create table tbl_pub(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+    "create table tbl_t1(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
+
+# test history snapshot
+$node_publisher->safe_psql('postgres',
+	"insert into tbl_pub select i, 'xyzzy', 'abcba', 'truncated' from generate_series(1,9) i;truncate tbl_pub;");
+$node_publisher->safe_psql('postgres',
+	"insert into tbl_pub select i, 'xyzzy', 'abcba', 'truncated' from generate_series(1,9) i;");
+my $pub_count =   $node_publisher->safe_psql('postgres',
+	"select count(*) from tbl_pub;");
+is($pub_count, 9, 'check that the historical snapshot is correct.');
+
+# test table change filter
+my $logstart = -s $node_publisher->logfile;
+$node_publisher->safe_psql('postgres',
+qq(BEGIN;
+insert into tbl_t1 select 1, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',i),(select sum(size) from pg_ls_replslotdir('mysub')) from generate_series(2,99) i;
+update tbl_t1 set val2 = repeat('xyzzy',id) where id > 1 and id < 10001;
+select check_replication_status();
+insert into tbl_t1 select 10001, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+COMMIT;)
+);
+
+my $filter_table_oid =  $node_publisher->safe_psql('postgres', "select oid from pg_class where relname='tbl_t1';");
+ok($node_publisher->log_contains("logical filter change by table " . $filter_table_oid, $logstart),
+	"the change of the tbl_t1 table is filtered.");
+
+done_testing();
\ No newline at end of file
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 95ae7845d8..54cfb7e45e 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1394,6 +1394,7 @@ LOCALLOCK
 LOCALLOCKOWNER
 LOCALLOCKTAG
 LOCALPREDICATELOCK
+LocatorFilterEntry
 LOCK
 LOCKMASK
 LOCKMETHODID
-- 
2.39.3

