From b4ac04d5fd7e87ee199f996dfda96305a6dac967 Mon Sep 17 00:00:00 2001
From: "adger.lj" <adger.lj@alibaba-inc.com>
Date: Fri, 1 Dec 2023 13:58:33 +0800
Subject: [PATCH] Filter irrelevant change before reassemble transactions
 during logical decoding

In order to reduce unnecessary logical decoding, irrelevant relationship
changes can be filtered out before reorganizing transaction fragments.
This can effectively reduce useless changes and prevent disk space from
being filled up with irrelevant data.

By design, Added a callback LogicalDecodeFilterByRelCB for the output plugin.
We implemented a function pgoutput_table_filter for pgoutput. And RelationSyncCache
is reused to determine whether a relationship-related change should be filtered out.

Referring to the implementation of the function pgoutput_change, currently only
insert/update/delete is can filtered, and other types of changes are not considered.
Perhaps more detailed analysis can be done and more filters can be filtered.

The filtering in pgoutput_change and ReorderBufferProcessTXN is no longer needed,
but is retained for now. Here's what to do next.
---
 src/backend/replication/logical/decode.c    | 156 +++++++++++++++++++-
 src/backend/replication/logical/logical.c   |  31 ++++
 src/backend/replication/pgoutput/pgoutput.c |  55 +++++++
 src/include/replication/logical.h           |   2 +
 src/include/replication/output_plugin.h     |   8 +
 src/test/subscription/t/034_table_filter.pl |  91 ++++++++++++
 6 files changed, 339 insertions(+), 4 deletions(-)
 create mode 100644 src/test/subscription/t/034_table_filter.pl

diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 1237118e84..ae28d2145b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -35,6 +35,7 @@
 #include "access/xlogrecord.h"
 #include "access/xlogutils.h"
 #include "catalog/pg_control.h"
+#include "common/string.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/message.h"
@@ -42,6 +43,7 @@
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
 #include "storage/standby.h"
+#include "utils/relfilenumbermap.h"
 
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -155,7 +157,7 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		case XLOG_PARAMETER_CHANGE:
 			{
 				xl_parameter_change *xlrec =
-					(xl_parameter_change *) XLogRecGetData(buf->record);
+				(xl_parameter_change *) XLogRecGetData(buf->record);
 
 				/*
 				 * If wal_level on the primary is reduced to less than
@@ -581,6 +583,137 @@ FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return filter_by_origin_cb_wrapper(ctx, origin_id);
 }
 
+static bool
+FilterByTable(LogicalDecodingContext *ctx, ReorderBufferChange *change)
+{
+	ReorderBuffer *rb = ctx->reorder;
+	Relation	relation = NULL;
+	Oid			reloid;
+	bool		result = false;
+	bool		using_subtxn;
+
+	if (ctx->callbacks.filter_by_origin_cb == NULL)
+		return false;
+
+	switch (change->action)
+	{
+			/* intentionally fall through */
+		case REORDER_BUFFER_CHANGE_INSERT:
+		case REORDER_BUFFER_CHANGE_UPDATE:
+		case REORDER_BUFFER_CHANGE_DELETE:
+			break;
+		default:
+			return false;
+	}
+
+	/*
+	 * Decoding needs access to syscaches et al., which in turn use
+	 * heavyweight locks and such. Thus we need to have enough state around to
+	 * keep track of those.  The easiest way is to simply use a transaction
+	 * internally.  That also allows us to easily enforce that nothing writes
+	 * to the database by checking for xid assignments.
+	 *
+	 * When we're called via the SQL SRF there's already a transaction
+	 * started, so start an explicit subtransaction there.
+	 */
+	using_subtxn = IsTransactionOrTransactionBlock();
+
+	if (using_subtxn)
+		BeginInternalSubTransaction("filter change by table");
+	else
+		StartTransactionCommand();
+
+	reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
+								  change->data.tp.rlocator.relNumber);
+	if (reloid == InvalidOid)
+		goto filter_done;
+
+	relation = RelationIdGetRelation(reloid);
+
+	if (!RelationIsValid(relation))
+		elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
+			 reloid,
+			 relpathperm(change->data.tp.rlocator,
+						 MAIN_FORKNUM));
+
+	if (!RelationIsLogicallyLogged(relation))
+	{
+		result = true;
+		goto filter_done;
+	}
+
+	/*
+	 * Ignore temporary heaps created during DDL unless the plugin has asked
+	 * for them.
+	 */
+	if (relation->rd_rel->relrewrite && !rb->output_rewrites)
+	{
+		result = true;
+		goto filter_done;
+	}
+
+	/*
+	 * For now ignore sequence changes entirely. Most of the time they don't
+	 * log changes using records we understand, so it doesn't make sense to
+	 * handle the few cases we do.
+	 */
+	if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
+	{
+		result = true;
+		goto filter_done;
+	}
+
+	if (IsToastRelation(relation))
+	{
+		Oid			real_reloid = InvalidOid;
+
+		/* pg_toast_ len is 9 */
+		char	   *toast_name = RelationGetRelationName(relation);
+		char	   *start_ch = &toast_name[9];
+
+		real_reloid = strtoint(start_ch, NULL, 10);
+
+		if (real_reloid == InvalidOid)
+			goto filter_done;
+
+		RelationClose(relation);
+
+		relation = RelationIdGetRelation(real_reloid);
+
+		if (!RelationIsValid(relation))
+			elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
+				 reloid,
+				 relpathperm(change->data.tp.rlocator,
+							 MAIN_FORKNUM));
+	}
+
+	result = filter_by_table_cb_wrapper(ctx, relation, change);
+
+filter_done:
+
+	if (result && RelationIsValid(relation))
+		elog(LOG, "logical filter change by table %s", RelationGetRelationName(relation));
+
+	/* this is just a sanity check against bad output plugin behaviour */
+	if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
+		elog(ERROR, "output plugin used XID %u",
+			 GetCurrentTransactionId());
+
+	if (RelationIsValid(relation))
+	{
+		RelationClose(relation);
+	}
+
+	AbortCurrentTransaction();
+	if (using_subtxn)
+		RollbackAndReleaseCurrentSubTransaction();
+
+	if (result)
+		ReorderBufferReturnChange(rb, change, false);
+
+	return result;
+}
+
 /*
  * Handle rmgr LOGICALMSG_ID records for LogicalDecodingProcessRecord().
  */
@@ -940,6 +1073,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change->data.tp.clear_toast_afterwards = true;
 
+	if (FilterByTable(ctx, change))
+		return;
+
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change,
 							 xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
@@ -1009,6 +1145,9 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change->data.tp.clear_toast_afterwards = true;
 
+	if (FilterByTable(ctx, change))
+		return;
+
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change, false);
 }
@@ -1065,6 +1204,9 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change->data.tp.clear_toast_afterwards = true;
 
+	if (FilterByTable(ctx, change))
+		return;
+
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change, false);
 }
@@ -1201,11 +1343,14 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 		else
 			change->data.tp.clear_toast_afterwards = false;
 
-		ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
-								 buf->origptr, change, false);
-
 		/* move to the next xl_multi_insert_tuple entry */
 		data += datalen;
+
+		if (FilterByTable(ctx, change))
+			continue;;
+
+		ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+								 buf->origptr, change, false);
 	}
 	Assert(data == tupledata + tuplelen);
 }
@@ -1240,6 +1385,9 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	change->data.tp.clear_toast_afterwards = true;
 
+	if (FilterByTable(ctx, change))
+		return;
+
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
 							 change, false);
 }
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 8288da5277..f5da5199dc 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1225,6 +1225,37 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+bool
+filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation, ReorderBufferChange *change)
+{
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+	bool		ret;
+
+	Assert(!ctx->fast_forward);
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "filter_by_table";
+	state.report_location = InvalidXLogRecPtr;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = false;
+	ctx->end_xact = false;
+
+	/* do the actual work: call callback */
+	ret = ctx->callbacks.filter_by_table_cb(ctx, relation, change);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+
+	return ret;
+}
+
 static void
 message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				   XLogRecPtr message_lsn, bool transactional,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index f9ed1083df..54a04b54e7 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -57,6 +57,9 @@ static void pgoutput_message(LogicalDecodingContext *ctx,
 							 Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
 								   RepOriginId origin_id);
+static bool pgoutput_table_filter(struct LogicalDecodingContext *ctx,
+								  Relation relation,
+								  ReorderBufferChange *change);
 static void pgoutput_begin_prepare_txn(LogicalDecodingContext *ctx,
 									   ReorderBufferTXN *txn);
 static void pgoutput_prepare_txn(LogicalDecodingContext *ctx,
@@ -259,6 +262,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_prepared_cb = pgoutput_commit_prepared_txn;
 	cb->rollback_prepared_cb = pgoutput_rollback_prepared_txn;
 	cb->filter_by_origin_cb = pgoutput_origin_filter;
+	cb->filter_by_table_cb = pgoutput_table_filter;
 	cb->shutdown_cb = pgoutput_shutdown;
 
 	/* transaction streaming */
@@ -1679,6 +1683,57 @@ pgoutput_origin_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+/*
+ * Return true if the relation has not been published, false otherwise.
+ */
+static bool
+pgoutput_table_filter(struct LogicalDecodingContext *ctx,
+					  Relation relation,
+					  ReorderBufferChange *change)
+{
+	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+	RelationSyncEntry *relentry;
+	ReorderBufferChangeType action = change->action;
+
+	if (!is_publishable_relation(relation))
+		return true;
+
+	relentry = get_rel_sync_entry(data, relation);
+
+	/* First check the table filter */
+	switch (action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
+			if (!relentry->pubactions.pubinsert)
+				return true;
+			break;
+		case REORDER_BUFFER_CHANGE_UPDATE:
+			if (!relentry->pubactions.pubupdate)
+				return true;
+			break;
+		case REORDER_BUFFER_CHANGE_DELETE:
+			if (!relentry->pubactions.pubdelete)
+				return true;
+
+			/*
+			 * This is only possible if deletes are allowed even when replica
+			 * identity is not defined for a table. Since the DELETE action
+			 * can't be published, we simply return.
+			 */
+			if (!change->data.tp.oldtuple)
+			{
+				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
+				return true;
+			}
+			break;
+		default:
+			Assert(false);
+	}
+
+	return false;
+}
+
+
 /*
  * Shutdown the output plugin.
  *
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dffc0d1564..452425ba5c 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -145,6 +145,8 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
 									  TransactionId xid, const char *gid);
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern bool filter_by_table_cb_wrapper(LogicalDecodingContext *ctx, Relation relation, 
+										ReorderBufferChange *change);
 extern void ResetLogicalStreamingState(void);
 extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
 
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2ffcf17505..4c885639d7 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -96,6 +96,13 @@ typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
 typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
 											   RepOriginId origin_id);
 
+/*
+ * Filter changes by table.
+ */
+typedef bool (*LogicalDecodeFilterByRelCB) (struct LogicalDecodingContext *ctx,
+											Relation relation,
+									  		ReorderBufferChange *change);
+
 /*
  * Called to shutdown an output plugin.
  */
@@ -222,6 +229,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
+	LogicalDecodeFilterByRelCB filter_by_table_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 
 	/* streaming of changes at prepare time */
diff --git a/src/test/subscription/t/034_table_filter.pl b/src/test/subscription/t/034_table_filter.pl
new file mode 100644
index 0000000000..8d4f962adc
--- /dev/null
+++ b/src/test/subscription/t/034_table_filter.pl
@@ -0,0 +1,91 @@
+# Copyright (c) 2021-2023, PostgreSQL Global Development Group
+
+# Basic logical replication test
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Initialize publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->append_conf('postgresql.conf', 'logical_decoding_work_mem = 64kB');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init;
+$node_subscriber->start;
+
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+	"create table tbl_pub(id int, val1 text, val2 text,size int);");
+$node_publisher->safe_psql('postgres',
+	"create table tbl_t1(id int, val1 text, val2 text,size int);");
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION mypub FOR TABLE public.tbl_pub;");
+$node_publisher->safe_psql('postgres',
+qq(
+CREATE OR REPLACE FUNCTION check_replication_status() RETURNS VOID AS \$\$
+DECLARE
+    replication_record pg_stat_replication;
+BEGIN
+    LOOP
+        SELECT *
+        INTO replication_record
+        FROM pg_stat_replication
+        WHERE application_name = 'mysub';
+        
+        IF replication_record.replay_lsn = replication_record.write_lsn THEN
+            EXIT;
+        END IF;
+    
+        PERFORM pg_sleep(1);
+    END LOOP;
+END;
+\$\$ LANGUAGE plpgsql;));
+
+# Create some preexisting content on subscriber
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_subscriber->safe_psql('postgres', 
+    "create table tbl_pub(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+    "create table tbl_t1(id int, val1 text, val2 text,size text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION mysub CONNECTION '$publisher_connstr' PUBLICATION mypub"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'mysub');
+
+# test filter
+$node_publisher->safe_psql('postgres',
+qq(BEGIN;
+insert into tbl_t1 select 1, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+insert into tbl_t1 select i,repeat('xyzzy', i),repeat('abcba',i),(select sum(size) from pg_ls_replslotdir('mysub')) from generate_series(2,9999) i;
+update tbl_t1 set val2 = repeat('xyzzy',id) where id > 1 and id < 10001;
+select check_replication_status();
+insert into tbl_t1 select 10001, 'xyzzy', 'abcba', sum(size) from pg_ls_replslotdir('mysub');
+COMMIT;)
+);
+
+my $minsize =   $node_publisher->safe_psql('postgres',
+	"select size from tbl_t1 order by size asc limit 1;");
+my $maxsize =   $node_publisher->safe_psql('postgres',
+	"select size from tbl_t1 order by size desc limit 1;");
+is($minsize, $maxsize, 'check decode filter table between maxsize and minsize');
+
+
+my $fristrow =   $node_publisher->safe_psql('postgres',
+	"select size from tbl_t1 where id = 1;");
+my $lastrow =   $node_publisher->safe_psql('postgres',
+	"select size from tbl_t1 where id = 10001;");
+is($minsize, $maxsize, 'check decode filter table between fristrow and lastrow');
+
+is($minsize, $lastrow, 'check decode filter table between minsize and lastrow');
+
+print "minsize: " . $minsize . "maxsize: " . $maxsize ."fristrow: " . $fristrow ."lastrow: " . $lastrow . "\n";
+
+done_testing();
\ No newline at end of file
-- 
2.39.3

