From 2ea131eb052c0e4c1c78f011d156e795b5640a58 Mon Sep 17 00:00:00 2001
From: Mikhail Kharitonov <m.kharitonov@postgrespro.ru>
Date: Tue, 8 Jul 2025 09:27:56 +0300
Subject: [PATCH v2] Logical replication: fix replica identity flag for
 partitioned tables

With publish_via_partition_root = true, UPDATE and DELETE rows that land in a
partition without REPLICA IDENTITY FULL were sent with tuple flag 'O' (old
tuple) even though only primary-key columns were included.  This violates the
logical-replication protocol and can confuse external CDC tools that rely
on the flag.

The flag is now chosen according to the replica identity of the partition
that actually stores the row: partitions with REPLICA IDENTITY FULL still use
'O', all others use 'K'.

A TAP test (src/test/subscription/t/036_partition_replica_identity.pl)
covers both cases.

Author: Mikhail Kharitonov <mikhail.kharitonov.dev@gmail.com>
Discussion: https://www.postgresql.org/message-id/CAKkoVatDVuH%3DAcnecrZSOQ0_Md6RfW4ExvCKJzJJ4JTrX1wbxQ%40mail.gmail.com
---
 src/backend/replication/logical/proto.c       |  32 +++--
 src/backend/replication/pgoutput/pgoutput.c   |   4 +-
 src/include/replication/logicalproto.h        |   6 +-
 .../t/036_partition_replica_identity.pl       | 135 ++++++++++++++++++
 4 files changed, 158 insertions(+), 19 deletions(-)
 create mode 100755 src/test/subscription/t/036_partition_replica_identity.pl

diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 1a352b542dc..935a39faf20 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -447,36 +447,37 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup)
  * Write UPDATE to the output stream.
  */
 void
-logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_update(StringInfo out, TransactionId xid, 
+						Relation real_relation, Relation send_as,
 						TupleTableSlot *oldslot, TupleTableSlot *newslot,
 						bool binary, Bitmapset *columns,
 						PublishGencolsType include_gencols_type)
 {
 	pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE);
 
-	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+	Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+		   real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+		   real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
 	/* transaction ID (if not valid, we're not streaming) */
 	if (TransactionIdIsValid(xid))
 		pq_sendint32(out, xid);
 
 	/* use Oid as relation identifier */
-	pq_sendint32(out, RelationGetRelid(rel));
+	pq_sendint32(out, RelationGetRelid(send_as));
 
 	if (oldslot != NULL)
 	{
-		if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+		if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
 			pq_sendbyte(out, 'O');	/* old tuple follows */
 		else
 			pq_sendbyte(out, 'K');	/* old key follows */
-		logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+		logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
 							   include_gencols_type);
 	}
 
 	pq_sendbyte(out, 'N');		/* new tuple follows */
-	logicalrep_write_tuple(out, rel, newslot, binary, columns,
+	logicalrep_write_tuple(out, send_as, newslot, binary, columns,
 						   include_gencols_type);
 }
 
@@ -525,14 +526,15 @@ logicalrep_read_update(StringInfo in, bool *has_oldtuple,
  * Write DELETE to the output stream.
  */
 void
-logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
+logicalrep_write_delete(StringInfo out, TransactionId xid,
+						Relation real_relation, Relation send_as,
 						TupleTableSlot *oldslot, bool binary,
 						Bitmapset *columns,
 						PublishGencolsType include_gencols_type)
 {
-	Assert(rel->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
-		   rel->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
+	Assert(real_relation->rd_rel->relreplident == REPLICA_IDENTITY_DEFAULT ||
+		   real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL ||
+		   real_relation->rd_rel->relreplident == REPLICA_IDENTITY_INDEX);
 
 	pq_sendbyte(out, LOGICAL_REP_MSG_DELETE);
 
@@ -541,14 +543,14 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel,
 		pq_sendint32(out, xid);
 
 	/* use Oid as relation identifier */
-	pq_sendint32(out, RelationGetRelid(rel));
+	pq_sendint32(out, RelationGetRelid(send_as));
 
-	if (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
+	if (real_relation->rd_rel->relreplident == REPLICA_IDENTITY_FULL)
 		pq_sendbyte(out, 'O');	/* old tuple follows */
 	else
 		pq_sendbyte(out, 'K');	/* old key follows */
 
-	logicalrep_write_tuple(out, rel, oldslot, binary, columns,
+	logicalrep_write_tuple(out, send_as, oldslot, binary, columns,
 						   include_gencols_type);
 }
 
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 082b4d9d327..261be60b8f4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1587,12 +1587,12 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 									relentry->include_gencols_type);
 			break;
 		case REORDER_BUFFER_CHANGE_UPDATE:
-			logicalrep_write_update(ctx->out, xid, targetrel, old_slot,
+			logicalrep_write_update(ctx->out, xid, relation, targetrel, old_slot,
 									new_slot, data->binary, relentry->columns,
 									relentry->include_gencols_type);
 			break;
 		case REORDER_BUFFER_CHANGE_DELETE:
-			logicalrep_write_delete(ctx->out, xid, targetrel, old_slot,
+			logicalrep_write_delete(ctx->out, xid, relation, targetrel, old_slot,
 									data->binary, relentry->columns,
 									relentry->include_gencols_type);
 			break;
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index b261c60d3fa..8940c31b4f1 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -228,7 +228,8 @@ extern void logicalrep_write_insert(StringInfo out, TransactionId xid,
 									PublishGencolsType include_gencols_type);
 extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup);
 extern void logicalrep_write_update(StringInfo out, TransactionId xid,
-									Relation rel, TupleTableSlot *oldslot,
+									Relation real_relation, Relation send_as,
+									TupleTableSlot *oldslot,
 									TupleTableSlot *newslot, bool binary,
 									Bitmapset *columns,
 									PublishGencolsType include_gencols_type);
@@ -236,7 +237,8 @@ extern LogicalRepRelId logicalrep_read_update(StringInfo in,
 											  bool *has_oldtuple, LogicalRepTupleData *oldtup,
 											  LogicalRepTupleData *newtup);
 extern void logicalrep_write_delete(StringInfo out, TransactionId xid,
-									Relation rel, TupleTableSlot *oldslot,
+									Relation real_relation, Relation send_as,
+									TupleTableSlot *oldslot,
 									bool binary, Bitmapset *columns,
 									PublishGencolsType include_gencols_type);
 extern LogicalRepRelId logicalrep_read_delete(StringInfo in,
diff --git a/src/test/subscription/t/036_partition_replica_identity.pl b/src/test/subscription/t/036_partition_replica_identity.pl
new file mode 100755
index 00000000000..f69bca5ebe2
--- /dev/null
+++ b/src/test/subscription/t/036_partition_replica_identity.pl
@@ -0,0 +1,135 @@
+# Test logical replication with publish_via_partition_root,
+# where the parent has REPLICA IDENTITY FULL, but one partition does not.
+#
+# Expected behavior:
+# - For partitions with REPLICA IDENTITY FULL, old tuple must be marked as 'O' and contain full row.
+# - For partitions with REPLICA IDENTITY DEFAULT, old tuple should be marked as 'K' and contain only key columns.
+
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+sub log_state
+{
+  my ($node, $label) = @_;
+
+  my $rows = $node->safe_psql('postgres', q{
+    SELECT tableoid::regclass AS part,
+           id,
+           to_char(ts,'YYYY-MM-DD') AS ts,
+           load
+    FROM   part_table
+    ORDER  BY 1,2;
+  });
+  diag "----- $label: rows -----\n$rows\n";
+}
+
+my $pub = PostgreSQL::Test::Cluster->new('publisher');
+$pub->init(allows_streaming => 'logical');
+$pub->start;
+
+my $sub = PostgreSQL::Test::Cluster->new('subscriber');
+$sub->init;
+$sub->start;
+
+$pub->safe_psql('postgres', q{
+create table part_table(
+  id  int generated always as identity,
+  ts  timestamp,
+  load text,
+  constraint part_table_pk primary key(id, ts)
+) partition by range(ts);
+
+create table part_table_sect_1 partition of part_table
+  for values from ('2000-01-01') to ('2024-01-01');
+create table part_table_sect_2 partition of part_table
+  for values from ('2024-01-01') to (maxvalue);
+
+alter table part_table        replica identity full;
+alter table part_table_sect_1 replica identity full;
+
+create publication pub_part_table
+  for table part_table
+  with (publish_via_partition_root = true);
+});
+
+$pub->safe_psql('postgres',
+  q{select pg_create_logical_replication_slot('slot_test', 'pgoutput');});
+
+$sub->safe_psql('postgres', q{
+create table part_table(
+  id  int,
+  ts  timestamp,
+  load text,
+  constraint part_table_pk primary key(id, ts)
+) partition by range(ts);
+
+create table part_table_sect_1 partition of part_table
+  for values from ('2000-01-01') to ('2024-01-01');
+create table part_table_sect_2 partition of part_table
+  for values from ('2024-01-01') to (maxvalue);
+});
+
+my $connstr = $pub->connstr . ' dbname=postgres';
+$sub->safe_psql('postgres', qq{
+create subscription sub_part
+  connection '$connstr application_name=sub_part'
+  publication pub_part_table;});
+
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+
+$pub->safe_psql('postgres', q{
+insert into part_table values (default, '2020-01-01 00:00', 'first');
+insert into part_table values (default, '2025-01-01 00:00', 'second');
+});
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+diag("\n");
+log_state($pub, 'publisher after insert');
+log_state($sub, 'subscriber after insert');
+
+$pub->safe_psql('postgres',
+  q{update part_table set ts = ts + interval '1 day';});
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+
+log_state($pub, 'publisher after update');
+log_state($sub, 'subscriber after update');
+
+$pub->safe_psql('postgres', q{delete from part_table;});
+$sub->wait_for_subscription_sync($pub, 'sub_part');
+
+log_state($pub, 'publisher after delete');
+log_state($sub, 'subscriber after delete');
+
+my $wal = $pub->safe_psql('postgres', q{
+select string_agg(encode(data, 'escape'),'')
+  from pg_logical_slot_get_binary_changes(
+         'slot_test', null, null,
+         'proto_version','1',
+         'publication_names','pub_part_table');
+});
+
+diag("---- WAL stream ----\n$wal\n");
+
+# 1: first partition has REPLICA IDENTITY FULL - full old tuple
+# (see - https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html#PROTOCOL-LOGICALREP-MESSAGE-FORMATS-UPDATE)
+like(
+  $wal,
+  qr/U.*O.*first.*first/s,
+  'partition WITH REPLICA IDENTITY FULL contains full old tuple'
+);
+
+# 2: second partition has REPLICA IDENTITY DEFAULT - only keys expected.
+if ($wal =~ /U.*K.*second/s)
+{
+    pass("Tag K correctly used for partition with REPLICA IDENTITY DEFAULT");
+}
+elsif ($wal =~ /(U.*O.*second)/s)
+{
+    my $blk = $1;
+    my $count = () = $blk =~ /second/g;
+    is($count, 2, "Tag O used but this partition with REPLICA IDENTITY DEFAULT");
+}
+
+done_testing();
-- 
2.34.1

