From 60ecc56984f0c2105a9eee24f2f73d05ee6ff1f3 Mon Sep 17 00:00:00 2001
From: "adger.lj" <adger.lj@alibaba-inc.com>
Date: Thu, 7 Dec 2023 15:51:12 +0800
Subject: [PATCH] Reduce useless changes before reassembly during logical
 replication

In order to reduce unnecessary logical replication, irrelevant relationship
changes can be filtered out before reorganizing transaction fragments.
This can effectively reduce useless changes and prevent storage space from
being filled up with irrelevant data.

By design, Added a callback LogicalDecodeFilterByRelCB for the output plugin.
We implemented a function pgoutput_table_filter for pgoutput. And RelationSyncCache
is reused to determine whether a relationship-related change should be filtered out.
referring to the implementation of the function pgoutput_change, currently only
insert/update/delete is can filtered, and other types of changes are not considered.
Perhaps more detailed analysis can be done and more filters can be filtered.

Most of the code in the FilterByTable function is transplanted from the ReorderBufferProcessTXN
function, which can be called before the ReorderBufferQueueChange function.It is
also the encapsulation of the callback function filter_by_table_cb in logical.c.

In general, this patch concentrates the filtering of changes in the ReorderBufferProcessTXN
function and the pgoutput_change function into the FilterByTable function, and calls
it before the ReorderBufferQueueChange function.
---
 src/backend/replication/logical/decode.c    | 153 +++++++++++++++++++-
 src/backend/replication/logical/logical.c   |  31 ++++
 src/backend/replication/pgoutput/pgoutput.c |  89 +++++++-----
 src/include/replication/logical.h           |   2 +
 src/include/replication/output_plugin.h     |   7 +
 src/test/subscription/t/034_table_filter.pl |  91 ++++++++++++
 6 files changed, 335 insertions(+), 38 deletions(-)
 create mode 100644 src/test/subscription/t/034_table_filter.pl

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index b3f8f908d1..47cda524bd 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,6 +35,7 @@
 #include "access/xlogrecord.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
+#include "common/string.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/message.h"
@@ -42,6 +43,7 @@
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
+#include "utils/relfilenumbermap.h"
 
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -155,7 +157,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_PARAMETER_CHANGE:
 			{
 				xl_parameter_change *xlrec =
-					(xl_parameter_change *) XLogRecGetData(buf->record);
+				(xl_parameter_change *) XLogRecGetData(buf->record);
 
 				/*
 				 * If wal_level on the primary is reduced to less than
@@ -581,6 +583,134 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return filter_by_origin_cb_wrapper(ctx, origin_id);
 }
 
+static bool
+FilterByTable(LogicalDecodingContext *ctx, ReorderBufferChange *change)
+{
+	ReorderBuffer *rb = ctx->reorder;
+	Relation	relation = NULL;
+	Oid			reloid;
+	bool		result = false;
+	bool		using_subtxn;
+
+	if (ctx->callbacks.filter_by_table_cb == NULL)
+		return false;
+
+	switch (change->action)
+	{
+			/* intentionally fall through */
+		case REORDER_BUFFER_CHANGE_INSERT:
+		case REORDER_BUFFER_CHANGE_UPDATE:
+		case REORDER_BUFFER_CHANGE_DELETE:
+			break;
+		default:
+			return false;
+	}
+
+	/*
+	 * Decoding needs access to syscaches et al., which in turn use
+	 * heavyweight locks and such. Thus we need to have enough state around to
+	 * keep track of those.  The easiest way is to simply use a transaction
+	 * internally.  That also allows us to easily enforce that nothing writes
+	 * to the database by checking for xid assignments.
+	 *
+	 * When we're called via the SQL SRF there's already a transaction
+	 * started, so start an explicit subtransaction there.
+	 */
+	using_subtxn = IsTransactionOrTransactionBlock();
+
+	if (using_subtxn)
+		BeginInternalSubTransaction("filter change by table");
+	else
+		StartTransactionCommand();
+
+	reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
+								  change->data.tp.rlocator.relNumber);
+	if (reloid == InvalidOid)
+	{
+		result = true;
+		goto filter_done;
+	}
+
+	relation = RelationIdGetRelation(reloid);
+
+	if (!RelationIsValid(relation))
+		elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
+			 reloid,
+			 relpathperm(change->data.tp.rlocator,
+						 MAIN_FORKNUM));
+
+	if (!RelationIsLogicallyLogged(relation))
+	{
+		result = true;
+		goto filter_done;
+	}
+
+	/*
+	 * Ignore temporary heaps created during DDL unless the plugin has asked
+	 * for them.
+	 */
+	if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+	{
+		result = true;
+		goto filter_done;
+	}
+
+	/*
+	 * For now ignore sequence changes entirely. Most of the time they don't
+	 * log changes using records we understand, so it doesn't make sense to
+	 * handle the few cases we do.
+	 */
+	if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+	{
+		result = true;
+		goto filter_done;
+	}
+
+	if (IsToastRelation(relation))
+	{
+		Oid			real_reloid = InvalidOid;
+
+		/* pg_toast_ len is 9 */
+		char	   *toast_name = RelationGetRelationName(relation);
+		char	   *start_ch = &toast_name[9];
+
+		real_reloid = strtoint(start_ch, NULL, 10);
+
+		if (real_reloid == InvalidOid)
+			elog(ERROR, "cannot get the real table oid for toast table %s, error: %m", toast_name);
+
+		RelationClose(relation);
+
+		relation = RelationIdGetRelation(real_reloid);
+
+		if (!RelationIsValid(relation))
+			elog(ERROR, "could not open real relation with OID %u (for toast table filenumber \"%s\")",
+				 reloid,
+				 relpathperm(change->data.tp.rlocator,
+							 MAIN_FORKNUM));
+	}
+
+	result = filter_by_table_cb_wrapper(ctx, relation, change);
+
+filter_done:
+
+	if (result && RelationIsValid(relation))
+		elog(DEBUG1, "logical filter change by table %s", RelationGetRelationName(relation));
+
+	if (RelationIsValid(relation))
+		RelationClose(relation);
+
+	AbortCurrentTransaction();
+
+	if (using_subtxn)
+		RollbackAndReleaseCurrentSubTransaction();
+
+	if (result)
+		ReorderBufferReturnChange(rb, change, false);
+
+	return result;
+}
+
 /*
  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
  */
@@ -940,6 +1070,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change->data.tp.clear_toast_afterwards = true;
 
+	if (FilterByTable(ctx, change))
+		return;
+
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change,
 							 xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
@@ -1009,6 +1142,9 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change->data.tp.clear_toast_afterwards = true;
 
+	if (FilterByTable(ctx, change))
+		return;
+
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change, false);
 }
@@ -1065,6 +1201,9 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change->data.tp.clear_toast_afterwards = true;
 
+	if (FilterByTable(ctx, change))
+		return;
+
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change, false);
 }
@@ -1201,11 +1340,14 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		else
 			change->data.tp.clear_toast_afterwards = false;
 
-		ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
-								 buf->origptr, change, false);
-
 		/* move to the next xl_multi_insert_tuple entry */
 		data += datalen;
+
+		if (FilterByTable(ctx, change))
+			continue;;
+
+		ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+								 buf->origptr, change, false);
 	}
 	Assert(data == tupledata + tuplelen);
 }
@@ -1240,6 +1382,9 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change->data.tp.clear_toast_afterwards = true;
 
+	if (FilterByTable(ctx, change))
+		return;
+
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change, false);
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index ca09c683f1..8bd0cefb22 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1225,6 +1225,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+bool
+filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation, ReorderBufferChange *change)
+{
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	Assert(!ctx->fast_forward);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_by_table";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->end_xact = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_by_table_cb(ctx, relation, change);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 425238187f..2966258ea3 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -57,6 +57,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static bool pgoutput_table_filter(struct LogicalDecodingContext *ctx,
+								  Relation relation,
+								  ReorderBufferChange *change);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -259,6 +262,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_by_table_cb = pgoutput_table_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -1415,9 +1419,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
-	if (!is_publishable_relation(relation))
-		return;
-
 	/*
 	 * Remember the xid for the change in streaming mode. We need to send xid
 	 * with each change in the streaming mode so that subscriber can make
@@ -1428,37 +1429,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		xid = change->txn->xid;
 
 	relentry = get_rel_sync_entry(data, relation);
-
-	/* First check the table filter */
-	switch (action)
-	{
-		case REORDER_BUFFER_CHANGE_INSERT:
-			if (!relentry->pubactions.pubinsert)
-				return;
-			break;
-		case REORDER_BUFFER_CHANGE_UPDATE:
-			if (!relentry->pubactions.pubupdate)
-				return;
-			break;
-		case REORDER_BUFFER_CHANGE_DELETE:
-			if (!relentry->pubactions.pubdelete)
-				return;
-
-			/*
-			 * This is only possible if deletes are allowed even when replica
-			 * identity is not defined for a table. Since the DELETE action
-			 * can't be published, we simply return.
-			 */
-			if (!change->data.tp.oldtuple)
-			{
-				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
-				return;
-			}
-			break;
-		default:
-			Assert(false);
-	}
-
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
@@ -1684,6 +1654,57 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+/*
+ * Return true if the relation has not been published, false otherwise.
+ */
+static bool
+pgoutput_table_filter(struct LogicalDecodingContext *ctx,
+					  Relation relation,
+					  ReorderBufferChange *change)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	RelationSyncEntry *relentry;
+	ReorderBufferChangeType action = change->action;
+
+	if (!is_publishable_relation(relation))
+		return true;
+
+	relentry = get_rel_sync_entry(data, relation);
+
+	/* First check the table filter */
+	switch (action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			if (!relentry->pubactions.pubinsert)
+				return true;
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			if (!relentry->pubactions.pubupdate)
+				return true;
+			break;
+		case REORDER_BUFFER_CHANGE_DELETE:
+			if (!relentry->pubactions.pubdelete)
+				return true;
+
+			/*
+			 * This is only possible if deletes are allowed even when replica
+			 * identity is not defined for a table. Since the DELETE action
+			 * can't be published, we simply return.
+			 */
+			if (!change->data.tp.oldtuple)
+			{
+				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+				return true;
+			}
+			break;
+		default:
+			Assert(false);
+	}
+
+	return false;
+}
+
+
 /*
  * Shutdown the output plugin.
  *
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dc2df4ce92..30a16cd784 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -145,6 +145,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern bool filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation,
+										ReorderBufferChange *change);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 44988ebdd8..030eb5afb7 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -96,6 +96,12 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
 											   RepOriginId origin_id);
 
+/*
+ * Filter changes by table.
+ */
+typedef bool (*LogicalDecodeFilterByRelCB) (struct LogicalDecodingContext *ctx,
+											Relation relation, ReorderBufferChange *change);
+
 /*
  * Called to shutdown an output plugin.
  */
@@ -222,6 +228,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterByRelCB filter_by_table_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/test/subscription/t/034_table_filter.pl b/src/test/subscription/t/034_table_filter.pl
new file mode 100644
index 0000000000..8d4f962adc
--- /dev/null
+++ b/src/test/subscription/t/034_table_filter.pl
@@ -0,0 +1,91 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"create table tbl_pub(id int, val1 text, val2 text,size int);");
+$node_publisher->safe_psql('postgres',
+	"create table tbl_t1(id int, val1 text, val2 text,size int);");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION mypub FOR TABLE public.tbl_pub;");
+$node_publisher->safe_psql('postgres',
+qq(
+CREATE OR REPLACE FUNCTION check_replication_status() RETURNS VOID AS \$\$
+DECLARE
+    replication_record pg_stat_replication;
+BEGIN
+    LOOP
+        SELECT *
+        INTO replication_record
+        FROM pg_stat_replication
+        WHERE application_name = 'mysub';
+        
+        IF replication_record.replay_lsn = replication_record.write_lsn THEN
+            EXIT;
+        END IF;
+    
+        PERFORM pg_sleep(1);
+    END LOOP;
+END;
+\$\$ LANGUAGE plpgsql;));
+
+# Create some preexisting content on subscriber
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres', 
+    "create table tbl_pub(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+    "create table tbl_t1(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
+
+# test filter
+$node_publisher->safe_psql('postgres',
+qq(BEGIN;
+insert into tbl_t1 select 1, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',i),(select sum(size) from pg_ls_replslotdir('mysub')) from generate_series(2,9999) i;
+update tbl_t1 set val2 = repeat('xyzzy',id) where id > 1 and id < 10001;
+select check_replication_status();
+insert into tbl_t1 select 10001, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+COMMIT;)
+);
+
+my $minsize =   $node_publisher->safe_psql('postgres',
+	"select size from tbl_t1 order by size asc limit 1;");
+my $maxsize =   $node_publisher->safe_psql('postgres',
+	"select size from tbl_t1 order by size desc limit 1;");
+is($minsize, $maxsize, 'check decode filter table between maxsize and minsize');
+
+
+my $fristrow =   $node_publisher->safe_psql('postgres',
+	"select size from tbl_t1 where id = 1;");
+my $lastrow =   $node_publisher->safe_psql('postgres',
+	"select size from tbl_t1 where id = 10001;");
+is($minsize, $maxsize, 'check decode filter table between fristrow and lastrow');
+
+is($minsize, $lastrow, 'check decode filter table between minsize and lastrow');
+
+print "minsize: " . $minsize . "maxsize: " . $maxsize ."fristrow: " . $fristrow ."lastrow: " . $lastrow . "\n";
+
+done_testing();
\ No newline at end of file
-- 
2.39.3

