From 742410c3be8c3222a354655fc0e3c198bd9562ad Mon Sep 17 00:00:00 2001
From: amitlan <amitlangote09@gmail.com>
Date: Fri, 17 Apr 2020 23:32:05 +0900
Subject: [PATCH 1/2] Rearrange some code in pgoutput.c

pgoutput_change() has grown some code due to recent partitioning
support commits that looks repetitive.

Change where RelationSyncEntry.map is set so that it appears to be
at a less random location.
---
 src/backend/replication/pgoutput/pgoutput.c | 122 ++++++++++++++--------------
 1 file changed, 59 insertions(+), 63 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 77b85fc..c499410 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -299,15 +299,7 @@ maybe_send_schema(LogicalDecodingContext *ctx,
 	if (relentry->publish_as_relid != RelationGetRelid(relation))
 	{
 		Relation	ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-		TupleDesc	indesc = RelationGetDescr(relation);
-		TupleDesc	outdesc = RelationGetDescr(ancestor);
-		MemoryContext oldctx;
-
-		/* Map must live as long as the session does. */
-		oldctx = MemoryContextSwitchTo(CacheMemoryContext);
-		relentry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
-											   CreateTupleDescCopy(outdesc));
-		MemoryContextSwitchTo(oldctx);
+
 		send_relation_and_attrs(ancestor, ctx);
 		RelationClose(ancestor);
 	}
@@ -363,6 +355,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
 	MemoryContext old;
 	RelationSyncEntry *relentry;
+	Relation	ancestor = NULL;
+	HeapTuple	oldtuple;
+	HeapTuple	newtuple;
 
 	if (!is_publishable_relation(relation))
 		return;
@@ -393,67 +388,43 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	maybe_send_schema(ctx, relation, relentry);
 
+	/* Switch relation if publishing via ancestor. */
+	if (relentry->publish_as_relid != RelationGetRelid(relation))
+	{
+		Assert(relation->rd_rel->relispartition);
+		ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+		relation = ancestor;
+	}
+
+	/* Set oldtuple/newtuple and convert to ancestor rowtype if necessary. */
+	oldtuple = change->data.tp.oldtuple ?
+			&change->data.tp.oldtuple->tuple : NULL;
+	if (oldtuple && relentry->map)
+		oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
+
+	newtuple = change->data.tp.newtuple ?
+			&change->data.tp.newtuple->tuple : NULL;
+	if (newtuple && relentry->map)
+		newtuple = execute_attr_map_tuple(newtuple, relentry->map);
+
 	/* Send the data */
 	switch (change->action)
 	{
 		case REORDER_BUFFER_CHANGE_INSERT:
-			{
-				HeapTuple	tuple = &change->data.tp.newtuple->tuple;
-
-				/* Switch relation if publishing via root. */
-				if (relentry->publish_as_relid != RelationGetRelid(relation))
-				{
-					Assert(relation->rd_rel->relispartition);
-					relation = RelationIdGetRelation(relentry->publish_as_relid);
-					/* Convert tuple if needed. */
-					if (relentry->map)
-						tuple = execute_attr_map_tuple(tuple, relentry->map);
-				}
+			OutputPluginPrepareWrite(ctx, true);
+			logicalrep_write_insert(ctx->out, relation, newtuple);
+			OutputPluginWrite(ctx, true);
+			break;
 
-				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_insert(ctx->out, relation, tuple);
-				OutputPluginWrite(ctx, true);
-				break;
-			}
 		case REORDER_BUFFER_CHANGE_UPDATE:
-			{
-				HeapTuple	oldtuple = change->data.tp.oldtuple ?
-				&change->data.tp.oldtuple->tuple : NULL;
-				HeapTuple	newtuple = &change->data.tp.newtuple->tuple;
-
-				/* Switch relation if publishing via root. */
-				if (relentry->publish_as_relid != RelationGetRelid(relation))
-				{
-					Assert(relation->rd_rel->relispartition);
-					relation = RelationIdGetRelation(relentry->publish_as_relid);
-					/* Convert tuples if needed. */
-					if (relentry->map)
-					{
-						oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
-						newtuple = execute_attr_map_tuple(newtuple, relentry->map);
-					}
-				}
+			OutputPluginPrepareWrite(ctx, true);
+			logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
+			OutputPluginWrite(ctx, true);
+			break;
 
-				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_update(ctx->out, relation, oldtuple, newtuple);
-				OutputPluginWrite(ctx, true);
-				break;
-			}
 		case REORDER_BUFFER_CHANGE_DELETE:
-			if (change->data.tp.oldtuple)
+			if (oldtuple)
 			{
-				HeapTuple	oldtuple = &change->data.tp.oldtuple->tuple;
-
-				/* Switch relation if publishing via root. */
-				if (relentry->publish_as_relid != RelationGetRelid(relation))
-				{
-					Assert(relation->rd_rel->relispartition);
-					relation = RelationIdGetRelation(relentry->publish_as_relid);
-					/* Convert tuple if needed. */
-					if (relentry->map)
-						oldtuple = execute_attr_map_tuple(oldtuple, relentry->map);
-				}
-
 				OutputPluginPrepareWrite(ctx, true);
 				logicalrep_write_delete(ctx->out, relation, oldtuple);
 				OutputPluginWrite(ctx, true);
@@ -461,10 +432,14 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			else
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
 			break;
+
 		default:
 			Assert(false);
 	}
 
+	if (ancestor)
+		RelationClose(ancestor);
+
 	/* Cleanup */
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
@@ -636,8 +611,6 @@ static RelationSyncEntry *
 get_rel_sync_entry(PGOutputData *data, Oid relid)
 {
 	RelationSyncEntry *entry;
-	bool		am_partition = get_rel_relispartition(relid);
-	char		relkind = get_rel_relkind(relid);
 	bool		found;
 	MemoryContext oldctx;
 
@@ -657,6 +630,9 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		List	   *pubids = GetRelationPublications(relid);
 		ListCell   *lc;
 		Oid			publish_as_relid = relid;
+		Relation	relation = RelationIdGetRelation(relid);
+		bool		am_partition = relation->rd_rel->relispartition;
+		char		relkind = relation->rd_rel->relkind;
 
 		/* Reload publications if needed before use. */
 		if (!publications_valid)
@@ -746,7 +722,27 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
 		list_free(pubids);
 
 		entry->publish_as_relid = publish_as_relid;
+		/* Also set map while at it. */
+		if (publish_as_relid != relid)
+		{
+			Relation	ancestor = RelationIdGetRelation(publish_as_relid);
+			TupleDesc	indesc = RelationGetDescr(relation);
+			TupleDesc	outdesc = RelationGetDescr(ancestor);
+			MemoryContext oldctx;
+
+			/*
+			 * Map must live as long as the session does. TupleDescs must be
+			 * copied before putting into the map, because they may not live
+			 * as long as we want the map to live.
+			 */
+			oldctx = MemoryContextSwitchTo(CacheMemoryContext);
+			entry->map = convert_tuples_by_name(CreateTupleDescCopy(indesc),
+												CreateTupleDescCopy(outdesc));
+			MemoryContextSwitchTo(oldctx);
+			RelationClose(ancestor);
+		}
 		entry->replicate_valid = true;
+		RelationClose(relation);
 	}
 
 	if (!found)
-- 
1.8.3.1

