On Fri, Dec 3, 2021 at 3:21 PM vignesh C <vignes...@gmail.com> wrote:
>
> On Fri, Dec 3, 2021 at 1:13 PM Michael Paquier <mich...@paquier.xyz> wrote:
> >
> > On Thu, Aug 26, 2021 at 09:00:39PM +0530, vignesh C wrote:
> > > The previous patch was failing because of the recent test changes made
> > > by commit 201a76183e2 which unified new and get_new_node, attached
> > > patch has the changes to handle the changes accordingly.
> >
> > Please note that the CF app is complaining about this patch, so a
> > rebase is required.  I have moved it to next CF, waiting on author,
> > for now.
>
> Thanks for letting me know, I have rebased it on top of HEAD, the
> attached v2 version has the rebased changes.

The patch was not applying on top of the HEAD, attached v3 version
which has the rebased changes.

Regards,
Vignesh
From ad79b95e9362239e32cc597aeac1d6e22aaa7504 Mon Sep 17 00:00:00 2001
From: Vigneshwaran C <vignes...@gmail.com>
Date: Sat, 12 Mar 2022 13:04:55 +0530
Subject: [PATCH v3] Fix for invalidating logical replication relations when
 there is a change in schema.

When the schema gets changed, the rel sync cache invalidation was not
happening, fixed it by adding a callback for schema change.
---
 src/backend/replication/pgoutput/pgoutput.c | 44 ++++++++++++
 src/test/subscription/t/001_rep_changes.pl  | 74 +++++++++++++++++++++
 2 files changed, 118 insertions(+)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index ea57a0477f..2ce634a90b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -176,6 +176,8 @@ static RelationSyncEntry *get_rel_sync_entry(PGOutputData *data,
 static void rel_sync_cache_relation_cb(Datum arg, Oid relid);
 static void rel_sync_cache_publication_cb(Datum arg, int cacheid,
 										  uint32 hashvalue);
+static void rel_sync_cache_namespace_cb(Datum arg, int cacheid,
+										uint32 hashvalue);
 static void set_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
 											TransactionId xid);
 static bool get_schema_sent_in_streamed_txn(RelationSyncEntry *entry,
@@ -1658,6 +1660,9 @@ init_rel_sync_cache(MemoryContext cachectx)
 	CacheRegisterSyscacheCallback(PUBLICATIONNAMESPACEMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_namespace_cb,
+								  (Datum) 0);
 }
 
 /*
@@ -1989,6 +1994,45 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 	}
 }
 
+/*
+ * Namespace syscache invalidation callback
+ */
+static void
+rel_sync_cache_namespace_cb(Datum arg, int cacheid, uint32 hashvalue)
+{
+	HASH_SEQ_STATUS status;
+	RelationSyncEntry *entry;
+
+	/*
+	 * We can get here if the plugin was used in SQL interface as the
+	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
+	 * is no way to unregister the relcache invalidation callback.
+	 */
+	if (RelationSyncCache == NULL)
+		return;
+
+	hash_seq_init(&status, RelationSyncCache);
+	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
+	{
+		/*
+		 * Reset schema sent status as the relation definition may have changed.
+		 * Also free any objects that depended on the earlier definition.
+		 */
+		entry->schema_sent = false;
+		list_free(entry->streamed_txns);
+		entry->streamed_txns = NIL;
+
+		if (entry->attrmap)
+			free_attrmap(entry->attrmap);
+		entry->attrmap = NULL;
+
+		if (hash_search(RelationSyncCache,
+				(void *) &entry->relid,
+				HASH_REMOVE, NULL) == NULL)
+			elog(ERROR, "hash table corrupted");
+	}
+}
+
 /*
  * Publication relation/schema map syscache invalidation callback
  *
diff --git a/src/test/subscription/t/001_rep_changes.pl b/src/test/subscription/t/001_rep_changes.pl
index af0cff6a30..4b7b9e54ff 100644
--- a/src/test/subscription/t/001_rep_changes.pl
+++ b/src/test/subscription/t/001_rep_changes.pl
@@ -520,6 +520,80 @@ is($result, qq(0), 'check replication origin was dropped on subscriber');
 $node_subscriber->stop('fast');
 $node_publisher->stop('fast');
 
+# Test schema invalidation by renaming the schema
+# Create tables on publisher
+# Initialize publisher node
+my $node_publisher1 = PostgreSQL::Test::Cluster->new('publisher1');
+$node_publisher1->init(allows_streaming => 'logical');
+$node_publisher1->start;
+
+# Create subscriber node
+my $node_subscriber1 = PostgreSQL::Test::Cluster->new('subscriber1');
+$node_subscriber1->init(allows_streaming => 'logical');
+$node_subscriber1->start;
+
+my $publisher1_connstr = $node_publisher1->connstr . ' dbname=postgres';
+
+$node_publisher1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber1->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber1->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will only be used for this test
+$node_publisher1->safe_psql('postgres',
+        "CREATE PUBLICATION tap_pub_sch FOR ALL TABLES"
+);
+$node_subscriber1->safe_psql('postgres',
+        "CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher1_connstr' PUBLICATION tap_pub_sch"
+);
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+
+# Also wait for initial table sync to finish
+$synced_query =
+  "SELECT count(1) = 0 FROM pg_subscription_rel WHERE srsubstate NOT IN ('r', 's');";
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+$node_publisher1->safe_psql('postgres',
+        "begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_publisher1->wait_for_catchup('tap_sub_sch');
+$node_subscriber1->poll_query_until('postgres', $synced_query)
+  or die "Timed out while waiting for subscriber to synchronize data";
+
+# Subscriber should not receive the inserted row for renamed schema
+$result =
+  $node_subscriber1->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is($result, qq(1
+2), 'check rows on subscriber after schema invalidation');
+
+# Drop subscription as we don't need it anymore
+$node_subscriber1->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_sch");
+
+# Drop publications as we don't need them anymore
+$node_publisher1->safe_psql('postgres', "DROP PUBLICATION tap_pub_sch");
+
+# Clean up the tables on both publisher and subscriber as we don't need them
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_publisher1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch1 cascade");
+$node_subscriber1->safe_psql('postgres', "DROP SCHEMA sch2 cascade");
+
+$node_subscriber1->stop('fast');
+$node_publisher1->stop('fast');
+
 # CREATE PUBLICATION while wal_level=minimal should succeed, with a WARNING
 $node_publisher->append_conf(
 	'postgresql.conf', qq(
-- 
2.32.0

Reply via email to