vignesh C <vignes...@gmail.com> writes:
> On Thu, 5 Jan 2023 at 03:17, Tom Lane <t...@sss.pgh.pa.us> wrote:
>> The bigger picture here though is that in examples such as the one
>> you gave at the top of the thread, it's not very clear to me that
>> there's *any* principled behavior.  If the connection between publisher
>> and subscriber tables is only the relation name, fine ... but exactly
>> which relation name applies?

> The connection between publisher and subscriber table is based on
> relation id, During the first change relid, relname and schema name
> from publisher will be sent to the subscriber. Subscriber stores these
> id, relname and schema name in the LogicalRepRelMap hash for which
> relation id is the key. Subsequent data received in the subscriber
> will use the relation id received from the publisher and apply the
> changes in the subscriber.

Hm.  I spent some time cleaning up this patch, and found that there's
still a problem.  ISTM that the row with value "3" ought to end up
in the subscriber's sch2.t1 table, but it does not: the attached
test script fails with

t/100_bugs.pl .. 6/? 
#   Failed test 'check data in subscriber sch2.t1 after schema rename'
#   at t/100_bugs.pl line 361.
#          got: ''
#     expected: '3'
# Looks like you failed 1 test of 9.
t/100_bugs.pl .. Dubious, test returned 1 (wstat 256, 0x100)
Failed 1/9 subtests 

What's up with that?

                        regards, tom lane

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 7737242516..876adab38e 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1929,7 +1929,22 @@ init_rel_sync_cache(MemoryContext cachectx)
 
 	Assert(RelationSyncCache != NULL);
 
+	/* We must update the cache entry for a relation after a relcache flush */
 	CacheRegisterRelcacheCallback(rel_sync_cache_relation_cb, (Datum) 0);
+
+	/*
+	 * Flush all cache entries after a pg_namespace change, in case it was a
+	 * schema rename affecting a relation being replicated.
+	 */
+	CacheRegisterSyscacheCallback(NAMESPACEOID,
+								  rel_sync_cache_publication_cb,
+								  (Datum) 0);
+
+	/*
+	 * Flush all cache entries after any publication changes.  (We need no
+	 * callback entry for pg_publication, because publication_invalidation_cb
+	 * will take care of it.)
+	 */
 	CacheRegisterSyscacheCallback(PUBLICATIONRELMAP,
 								  rel_sync_cache_publication_cb,
 								  (Datum) 0);
@@ -2325,8 +2340,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 /*
  * Publication relation/schema map syscache invalidation callback
  *
- * Called for invalidations on pg_publication, pg_publication_rel, and
- * pg_publication_namespace.
+ * Called for invalidations on pg_publication, pg_publication_rel,
+ * pg_publication_namespace, and pg_namespace.
  */
 static void
 rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
@@ -2337,14 +2352,14 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
 	/*
 	 * We can get here if the plugin was used in SQL interface as the
 	 * RelSchemaSyncCache is destroyed when the decoding finishes, but there
-	 * is no way to unregister the relcache invalidation callback.
+	 * is no way to unregister the invalidation callbacks.
 	 */
 	if (RelationSyncCache == NULL)
 		return;
 
 	/*
-	 * There is no way to find which entry in our cache the hash belongs to so
-	 * mark the whole cache as invalid.
+	 * We have no easy way to identify which cache entries this invalidation
+	 * event might have affected, so just mark them all invalid.
 	 */
 	hash_seq_init(&status, RelationSyncCache);
 	while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl
index 143caac792..de827f8777 100644
--- a/src/test/subscription/t/100_bugs.pl
+++ b/src/test/subscription/t/100_bugs.pl
@@ -70,9 +70,10 @@ $node_publisher->wait_for_catchup('sub1');
 pass('index predicates do not cause crash');
 
 # We'll re-use these nodes below, so drop their replication state.
-# We don't bother to drop the tables though.
 $node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION sub1");
 $node_publisher->safe_psql('postgres', "DROP PUBLICATION pub1");
+# Drop the tables too.
+$node_publisher->safe_psql('postgres', "DROP TABLE tab1");
 
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
@@ -307,6 +308,58 @@ is( $node_subscriber->safe_psql(
 	qq(-1|1),
 	"update works with REPLICA IDENTITY");
 
+# Clean up
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub");
+$node_publisher->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+$node_subscriber->safe_psql('postgres', "DROP TABLE tab_replidentity_index");
+
+# Test schema invalidation by renaming the schema
+
+# Create tables on publisher
+$node_publisher->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_publisher->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+
+# Create tables on subscriber
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch1");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch1.t1 (c1 int)");
+$node_subscriber->safe_psql('postgres', "CREATE SCHEMA sch2");
+$node_subscriber->safe_psql('postgres', "CREATE TABLE sch2.t1 (c1 int)");
+
+# Setup logical replication that will cover t1 under both schema names
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_sch FOR ALL TABLES");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_sch CONNECTION '$publisher_connstr' PUBLICATION tap_pub_sch"
+);
+
+# Wait for initial table sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Check what happens to data inserted before and after schema rename
+$node_publisher->safe_psql(
+	'postgres',
+	"begin;
+insert into sch1.t1 values(1);
+alter schema sch1 rename to sch2;
+create schema sch1;
+create table sch1.t1(c1 int);
+insert into sch1.t1 values(2);
+insert into sch2.t1 values(3);
+commit;");
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub_sch');
+
+# Subscriber's sch1.t1 should receive the row inserted into the new sch1.t1,
+# but not the row inserted into the old sch1.t1 post-rename.
+my $result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch1.t1");
+is( $result, qq(1
+2), 'check data in subscriber sch1.t1 after schema rename');
+
+# Instead, that row should appear in sch2.t1.
+$result = $node_subscriber->safe_psql('postgres', "SELECT * FROM sch2.t1");
+is($result, qq(3), 'check data in subscriber sch2.t1 after schema rename');
+
 $node_publisher->stop('fast');
 $node_subscriber->stop('fast');
 

Reply via email to