From e6bca40314cc515b33b27247768bff05cd42eeab Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Tue, 18 May 2021 14:52:04 +0900
Subject: [PATCH v6 1/2] pgoutput: fix memory management for
 RelationSyncEntry.map

Release memory allocated when creating the tuple-conversion map
and its component TupleDescs when its owning sync entry is
invalidated.  TupleDescs must also be freed when no map is deemed
necessary to begin with.
---
 src/backend/replication/pgoutput/pgoutput.c | 49 +++++++++++++++++----
 src/test/subscription/t/013_partition.pl    | 28 +++++++++++-
 2 files changed, 68 insertions(+), 9 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index c5fbebf55a..c691c063cd 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -64,10 +64,6 @@ typedef struct RelationSyncEntry
 {
 	Oid			relid;			/* relation oid */
 
-	/*
-	 * Did we send the schema?  If ancestor relid is set, its schema must also
-	 * have been sent for this to be true.
-	 */
 	bool		schema_sent;
 
 	bool		replicate_valid;
@@ -292,10 +288,17 @@ static void
 maybe_send_schema(LogicalDecodingContext *ctx,
 				  Relation relation, RelationSyncEntry *relentry)
 {
+	/* Nothing to do if we already sent the schema. */
 	if (relentry->schema_sent)
 		return;
 
-	/* If needed, send the ancestor's schema first. */
+	/*
+	 * Nope, so send the schema.  If the changes will be published using an
+	 * ancestor's schema, not the relation's own, send that ancestor's schema
+	 * before sending relation's own (XXX - maybe sending only the former
+	 * suffices?).  This is also a good place to set the map that will be used
+	 * to convert the relation's tuples into the ancestor's format, if needed.
+	 */
 	if (relentry->publish_as_relid != RelationGetRelid(relation))
 	{
 		Relation	ancestor = RelationIdGetRelation(relentry->publish_as_relid);
@@ -303,16 +306,31 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 		TupleDesc	outdesc = RelationGetDescr(ancestor);
 		MemoryContext oldctx;
 
+		send_relation_and_attrs(ancestor, ctx);
+
 		/* Map must live as long as the session does. */
 		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-		relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
-											   CreateTupleDescCopy(outdesc));
+
+		/*
+		 * Make copies of the TupleDescs that will live as long as the map
+		 * does before putting into the map.
+		 */
+		indesc = CreateTupleDescCopy(indesc);
+		outdesc = CreateTupleDescCopy(outdesc);
+		relentry->map = convert_tuples_by_name(indesc, outdesc);
+		if (relentry->map == NULL)
+		{
+			/* Map not necessary, so free the TupleDescs too. */
+			FreeTupleDesc(indesc);
+			FreeTupleDesc(outdesc);
+		}
+
 		MemoryContextSwitchTo(oldctx);
-		send_relation_and_attrs(ancestor, ctx);
 		RelationClose(ancestor);
 	}
 
 	send_relation_and_attrs(relation, ctx);
+
 	relentry->schema_sent = true;
 }
 
@@ -759,6 +777,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		list_free(pubids);
 
 		entry->publish_as_relid = publish_as_relid;
+		entry->map = NULL;	/* will be set by maybe_send_schema() if needed */
 		entry->replicate_valid = true;
 	}
 
@@ -801,9 +820,23 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid)
 
 	/*
 	 * Reset schema sent status as the relation definition may have changed.
+	 * Also, free any objects that depended on the earlier definition.
 	 */
 	if (entry != NULL)
+	{
 		entry->schema_sent = false;
+		if (entry->map)
+		{
+			/*
+			 * Must free the TupleDescs contained in the map explicitly,
+			 * because free_conversion_map() doesn't.
+			 */
+			FreeTupleDesc(entry->map->indesc);
+			FreeTupleDesc(entry->map->outdesc);
+			free_conversion_map(entry->map);
+		}
+		entry->map = NULL;
+	}
 }
 
 /*
diff --git a/src/test/subscription/t/013_partition.pl b/src/test/subscription/t/013_partition.pl
index a04c03a7e2..a59de0b28c 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 51;
+use Test::More tests => 53;
 
 # setup
 
@@ -532,3 +532,29 @@ is($result, qq(), 'truncate of tab3 replicated');
 
 $result = $node_subscriber2->safe_psql('postgres', "SELECT a FROM tab3_1");
 is($result, qq(), 'truncate of tab3_1 replicated');
+
+# check that the map to convert tuples from leaf partition to the root
+# table is correctly rebuilt when a new column is added
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE tab2 DROP b, ADD COLUMN c text DEFAULT 'pub_tab2', ADD b text");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 (a, b) VALUES (1, 'xxx'), (3, 'yyy'), (5, 'zzz')");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO tab2 (a, b, c) VALUES (6, 'aaa', 'xxx_c')");
+
+$node_publisher->wait_for_catchup('sub_viaroot');
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber1->safe_psql('postgres',
+	"SELECT c, a, b FROM tab2 ORDER BY 1, 2");
+is( $result, qq(pub_tab2|1|xxx
+pub_tab2|3|yyy
+pub_tab2|5|zzz
+xxx_c|6|aaa), 'inserts into tab2 replicated');
+
+$result = $node_subscriber2->safe_psql('postgres',
+	"SELECT c, a, b FROM tab2 ORDER BY 1, 2");
+is( $result, qq(pub_tab2|1|xxx
+pub_tab2|3|yyy
+pub_tab2|5|zzz
+xxx_c|6|aaa), 'inserts into tab2 replicated');
-- 
2.24.1

