From 48e288241b4c79581ecdd473cceac15985c8c681 Mon Sep 17 00:00:00 2001
From: Mikhail Kharitonov <mikhail.kharitonov.dev@gmail.com>
Date: Mon, 5 May 2025 10:14:37 +0300
Subject: [PATCH] Fix replica identity flags for partitioned tables with
 publish_via_partition_root

When using publish_via_partition_root and REPLICA IDENTITY FULL is set only
on the root partitioned table (not all leaf partitions), UPDATE and DELETE
messages may incorrectly use the 'O' (old tuple) tag, even though only the
primary key fields are sent.

This leads to protocol inconsistencies, since the subscriber interprets the
message as containing a full tuple, while it does not.

This patch ensures that the correct replica identity tag ('O' or 'K') is used
based on the actual leaf partition's REPLICA IDENTITY setting, rather than
the root.
---
 src/backend/replication/logical/proto.c     | 32 +++++++++++----------
 src/backend/replication/pgoutput/pgoutput.c |  4 +--
 src/include/replication/logicalproto.h      |  6 ++--
 3 files changed, 23 insertions(+), 19 deletions(-)

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1a352b542dc..935a39faf20 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -447,36 +447,37 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  * Write UPDATE to the output stream.
  */
 void
-logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_update(StringInfo out, TransactionId xid, 
+						Relation real_relation, Relation send_as,
 						TupleTableSlot *oldslot, TupleTableSlot *newslot,
 						bool binary, Bitmapset *columns,
 						PublishGencolsType include_gencols_type)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
-	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+	Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+		   real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+		   real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
 		pq_sendint32(out, xid);
 
 	/* use Oid as relation identifier */
-	pq_sendint32(out, RelationGetRelid(rel));
+	pq_sendint32(out, RelationGetRelid(send_as));
 
 	if (oldslot != NULL)
 	{
-		if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+		if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+		logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
 							   include_gencols_type);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns,
+	logicalrep_write_tuple(out, send_as, newslot, binary, columns,
 						   include_gencols_type);
 }
 
@@ -525,14 +526,15 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
  * Write DELETE to the output stream.
  */
 void
-logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_delete(StringInfo out, TransactionId xid,
+						Relation real_relation, Relation send_as,
 						TupleTableSlot *oldslot, bool binary,
 						Bitmapset *columns,
 						PublishGencolsType include_gencols_type)
 {
-	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+	Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+		   real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+		   real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
 	pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
 
@@ -541,14 +543,14 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 		pq_sendint32(out, xid);
 
 	/* use Oid as relation identifier */
-	pq_sendint32(out, RelationGetRelid(rel));
+	pq_sendint32(out, RelationGetRelid(send_as));
 
-	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+	if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
 		pq_sendbyte(out, 'O');	/* old tuple follows */
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+	logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
 						   include_gencols_type);
 }
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 693a766e6d7..7c15be3894b 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1587,12 +1587,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									relentry->include_gencols_type);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
-			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+			logicalrep_write_update(ctx->out, xid, relation, targetrel, old_slot,
 									new_slot, data->binary, relentry->columns,
 									relentry->include_gencols_type);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
-			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+			logicalrep_write_delete(ctx->out, xid, relation, targetrel, old_slot,
 									data->binary, relentry->columns,
 									relentry->include_gencols_type);
 			break;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..8940c31b4f1 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -228,7 +228,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									PublishGencolsType include_gencols_type);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
-									Relation rel, TupleTableSlot *oldslot,
+									Relation real_relation, Relation send_as,
+									TupleTableSlot *oldslot,
 									TupleTableSlot *newslot, bool binary,
 									Bitmapset *columns,
 									PublishGencolsType include_gencols_type);
@@ -236,7 +237,8 @@ extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
-									Relation rel, TupleTableSlot *oldslot,
+									Relation real_relation, Relation send_as,
+									TupleTableSlot *oldslot,
 									bool binary, Bitmapset *columns,
 									PublishGencolsType include_gencols_type);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
-- 
2.34.1

