From 76ab9f02a71c3c2dcae7d87681a29c011ad67a98 Mon Sep 17 00:00:00 2001
From: Hou Zhijie <houzj.fnst@cn.fujitsu.com>
Date: Wed, 15 Mar 2023 15:57:13 +0800
Subject: [PATCH] simplify the code in pgoutput_change

---
 src/backend/replication/pgoutput/pgoutput.c | 204 +++++---------------
 1 file changed, 51 insertions(+), 153 deletions(-)

diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 00a2d73dab..fdaaaa1922 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1448,173 +1448,68 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	/* Avoid leaking memory by using and resetting our own context */
 	old = MemoryContextSwitchTo(data->context);
 
-	/* Send the data */
-	switch (action)
+	/* Switch relation if publishing via root. */
+	if (relentry->publish_as_relid != RelationGetRelid(relation))
 	{
-		case REORDER_BUFFER_CHANGE_INSERT:
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
+		Assert(relation->rd_rel->relispartition);
+		ancestor = RelationIdGetRelation(relentry->publish_as_relid);
+		targetrel = ancestor;
+	}
 
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuple if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
+	if (change->data.tp.newtuple)
+	{
+		new_slot = relentry->new_slot;
+		ExecStoreHeapTuple(&change->data.tp.newtuple->tuple, new_slot, false);
 
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
+		if (relentry->attrmap)
+			new_slot = execute_attr_map_slot(relentry->attrmap, new_slot,
+											 MakeTupleTableSlot(RelationGetDescr(targetrel),
+																&TTSOpsVirtual));
+	}
 
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, NULL, &new_slot, relentry,
-									 &action))
-				break;
+	if (change->data.tp.oldtuple)
+	{
+		old_slot = relentry->old_slot;
+		ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple, old_slot, false);
 
-			/*
-			 * Send BEGIN if we haven't yet.
-			 *
-			 * We send the BEGIN message after ensuring that we will actually
-			 * send the change. This avoids sending a pair of BEGIN/COMMIT
-			 * messages for empty transactions.
-			 */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
+		if (relentry->attrmap)
+			old_slot = execute_attr_map_slot(relentry->attrmap, old_slot,
+											 MakeTupleTableSlot(RelationGetDescr(targetrel),
+																&TTSOpsVirtual));
+	}
 
-			/*
-			 * Schema should be sent using the original relation because it
-			 * also sends the ancestor's relation.
-			 */
-			maybe_send_schema(ctx, change, relation, relentry);
+	/*
+	 * Check row filter.
+	 *
+	 * Updates could be transformed to inserts or deletes based on the results
+	 * of the row filter for old and new tuple.
+	 */
+	if (!pgoutput_row_filter(targetrel, old_slot, &new_slot, relentry, &action))
+		goto cleanup;
 
-			OutputPluginPrepareWrite(ctx, true);
+	/* Send BEGIN if we haven't yet */
+	if (txndata && !txndata->sent_begin_txn)
+		pgoutput_send_begin(ctx, txn);
+
+	maybe_send_schema(ctx, change, relation, relentry);
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	/* Send the data */
+	switch (action)
+	{
+		case REORDER_BUFFER_CHANGE_INSERT:
 			logicalrep_write_insert(ctx->out, xid, targetrel, new_slot,
 									data->binary, relentry->columns);
-			OutputPluginWrite(ctx, true);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
-			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-			}
-
-			new_slot = relentry->new_slot;
-			ExecStoreHeapTuple(&change->data.tp.newtuple->tuple,
-							   new_slot, false);
-
-			/* Switch relation if publishing via root. */
-			if (relentry->publish_as_relid != RelationGetRelid(relation))
-			{
-				Assert(relation->rd_rel->relispartition);
-				ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-				targetrel = ancestor;
-				/* Convert tuples if needed. */
-				if (relentry->attrmap)
-				{
-					TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-					if (old_slot)
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-
-					new_slot = execute_attr_map_slot(relentry->attrmap,
-													 new_slot,
-													 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-				}
-			}
-
-			/* Check row filter */
-			if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-									 relentry, &action))
-				break;
-
-			/* Send BEGIN if we haven't yet */
-			if (txndata && !txndata->sent_begin_txn)
-				pgoutput_send_begin(ctx, txn);
-
-			maybe_send_schema(ctx, change, relation, relentry);
-
-			OutputPluginPrepareWrite(ctx, true);
-
-			/*
-			 * Updates could be transformed to inserts or deletes based on the
-			 * results of the row filter for old and new tuple.
-			 */
-			switch (action)
-			{
-				case REORDER_BUFFER_CHANGE_INSERT:
-					logicalrep_write_insert(ctx->out, xid, targetrel,
-											new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_UPDATE:
-					logicalrep_write_update(ctx->out, xid, targetrel,
-											old_slot, new_slot, data->binary,
-											relentry->columns);
-					break;
-				case REORDER_BUFFER_CHANGE_DELETE:
-					logicalrep_write_delete(ctx->out, xid, targetrel,
-											old_slot, data->binary,
-											relentry->columns);
-					break;
-				default:
-					Assert(false);
-			}
-
-			OutputPluginWrite(ctx, true);
+			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+									new_slot, data->binary, relentry->columns);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
 			if (change->data.tp.oldtuple)
-			{
-				old_slot = relentry->old_slot;
-
-				ExecStoreHeapTuple(&change->data.tp.oldtuple->tuple,
-								   old_slot, false);
-
-				/* Switch relation if publishing via root. */
-				if (relentry->publish_as_relid != RelationGetRelid(relation))
-				{
-					Assert(relation->rd_rel->relispartition);
-					ancestor = RelationIdGetRelation(relentry->publish_as_relid);
-					targetrel = ancestor;
-					/* Convert tuple if needed. */
-					if (relentry->attrmap)
-					{
-						TupleDesc	tupdesc = RelationGetDescr(targetrel);
-
-						old_slot = execute_attr_map_slot(relentry->attrmap,
-														 old_slot,
-														 MakeTupleTableSlot(tupdesc, &TTSOpsVirtual));
-					}
-				}
-
-				/* Check row filter */
-				if (!pgoutput_row_filter(targetrel, old_slot, &new_slot,
-										 relentry, &action))
-					break;
-
-				/* Send BEGIN if we haven't yet */
-				if (txndata && !txndata->sent_begin_txn)
-					pgoutput_send_begin(ctx, txn);
-
-				maybe_send_schema(ctx, change, relation, relentry);
-
-				OutputPluginPrepareWrite(ctx, true);
-				logicalrep_write_delete(ctx->out, xid, targetrel,
-										old_slot, data->binary,
-										relentry->columns);
-				OutputPluginWrite(ctx, true);
-			}
+				logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+										data->binary, relentry->columns);
 			else
 				elog(DEBUG1, "didn't send DELETE change because of missing oldtuple");
 			break;
@@ -1622,6 +1517,9 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 			Assert(false);
 	}
 
+	OutputPluginWrite(ctx, true);
+
+cleanup:
 	if (RelationIsValid(ancestor))
 	{
 		RelationClose(ancestor);
-- 
2.30.0.windows.2

