Hi Dilip,

I have created a couple of POC patches for the tablesync side of this.
The challenge with the tablesync involved permissions because the
table is owned by the superuser. I considered a few solutions and have
patches for a couple.

1. One simple solution is to allow only the superuser to set up large
object replication. This can be achieved with a command to enable
large object replication to a publication. Or maybe a guc flag can be
used. Ideally we don't want all publications to publish the large
objects here. So a publication level command might be preferred.

While the enablement mechanism is in question, I have implemented the
POC by adding the pg_largeobject table to the result of
pg_get_publication_tables. In the final implementation, we might also
add the pg_largeobject_metadata table.

The patch is attached in
POC-v3-0003-Tablesync-for-large-objects-for-superuser.patch. An
alternative would be to handle this special case on the subscriber
side. But it is simpler to do it on the publisher side. Alternatively
we might need the setup on both the publisher and subscriber sides.

For the full enablement we'll need this patch, the existing apply
worker patch for lo_write, and a patch supporting lo_create on the
apply worker side.

2. An alternative solution would be to allow a subscription owner to
only copy the large object entries owned by its counterpart on the
publisher. This allows all users to replicate large objects. It also
enables multiple subscriptions to setup large object replication. The
enablement question from the first approach still remains open. We can
also assume that a patch supporting lo_create on the apply worker will
be created along with this change.

For implementation, we can have two tablesync workers - one for
pg_largeobject_metadata and one for pg_largeobject. The
pg_largeobject_metadata worker can use lo_create to create the
largeobject entries. Alternatively it can just copy the items since
permissions are not an issue for this table. The pg_largeobject worker
will use lo_write/lo_put to insert the entries owned by it.

The major challenge with this approach is the synchronization required
between the pg_largeobject and pg_largeobject_metadata tablesync
workers. Usually for a table copy (as suggested for superusers in the
first approach), integrity checks are turned off and we should have no
issue with the two tablesync workers running in parallel. However,
explicitly using lo_write will check for the large object's existence,
which can lead to failures.

3. To avoid the synchronization problem above, I experimented with a
modified approach. Instead of having a separate tablesync worker for
pg_largeobject_metadata, I only created one tablesync worker for
pg_largeobject. This differs from a regular tablesync worker in the
following ways :

a. Instead of a copy command, it runs "select m.oid, lo_get(m.oid)
from pg_largeobject_metadata as m join pg_user as u on m.lomowner =
u.usesysid where u.usename = CURRENT_USER;"

This returns all largeobjects owned by the user.

b. Then I use lo_from_bytea (a refactored version which doesn't need
fmgr) to both create and write the large object which removes the need
for a separate lo_create.

POC-v3-0004-Enable-tablesync-for-large-objects-for-all-users.patch
contains this implementation. I tested it successfully for a small set
of largeobjects. This patch, along with support for lo_create in
applyworker should provide full support for large objects replication.

Note : We should also modify the applyWorker code to replicate only
the lo_writes on objects owned by the subscription user. I have not
made that change in the POC but can do it in the next version.

One major concern here is the performance of tablesync. I think that
most of the users will set up replication at the start time. Or
convert a physical replica to a logical replica. So this cost might
not be borne in many cases.

4. If we want a more performant version, one idea is to support bulk
writes for large objects. Then the above solution can be made more
performant. I have not analyzed the work required. Suggestions on this
would be welcome.

Thanks.


Nitin Motiani
Google
From 678ce86f4294e0c6bce9f67cc00437c045b9dc43 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <[email protected]>
Date: Mon, 2 Feb 2026 09:00:36 +0000
Subject: [PATCH v3 3/4] Tablesync for large objects for superuser

* Add the large object table to the output of pg_get_publication_tables.

* This approach ensures that it is handled without any special logic on the subscriber side.

* For POC, we have only added pg_largeobject. We can do the same for pg_largeobject_metadata.

* Due to the permissions around pg_largeobject, this will only work for superusers.

* In the final implementation this behaviour will be controlled by a flag or some other method
  which enables the large objects replication. For POC, we have done it for all publications.
---
 src/backend/catalog/pg_publication.c    | 8 ++++++++
 src/backend/commands/subscriptioncmds.c | 9 +++++++++
 2 files changed, 17 insertions(+)

diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c
index 9a4791c573e..f68fef6c4f3 100644
--- a/src/backend/catalog/pg_publication.c
+++ b/src/backend/catalog/pg_publication.c
@@ -1186,6 +1186,14 @@ pg_get_publication_tables(PG_FUNCTION_ARGS)
 				pub_elem_tables = list_concat_unique_oid(relids, schemarelids);
 			}
 
+			/*
+			 * TODO : For poc, just add pg_largeobject relid to the
+			 * publication. We need to figure out the long term approach to do
+			 * this and also decide which publication would publish the large
+			 * objects.
+			 */
+			pub_elem_tables = lappend_oid(pub_elem_tables, LargeObjectRelationId);
+
 			/*
 			 * Record the published table and the corresponding publication so
 			 * that we can get row filters and column lists later.
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 0b3c8499b49..3014b66ec91 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -2987,6 +2987,15 @@ fetch_relation_list(WalReceiverConn *wrconn, List *publications)
 
 	walrcv_clear_result(res);
 
+	/*
+	 * TODO : Might need a check or special code here to include the
+	 * largeobject tables. In the POC, we are chaning
+	 * pg_get_publication_tables. But that might change. We also might want to
+	 * have a combination of both - the publication provides the large object
+	 * tables and the subscription has a flag to decide if it wants to
+	 * replicate large objects or not.
+	 */
+
 	return relationlist;
 }
 
-- 
2.53.0.rc1.225.gd81095ad13-goog

From e354f683c01a03a0fb4d309f8b66f2228aa7091b Mon Sep 17 00:00:00 2001
From: Nitin Motiani <[email protected]>
Date: Wed, 31 Dec 2025 09:09:20 +0000
Subject: [PATCH v3 2/4] Add support to apply lo_write operations in
 applyworker

* Handle REORDER_BUFFER_CHANGE_LOWRITE operations on apply worker.
* This is done by using lo_put. This patch refactors lo_put to provide
  a version which is not fmgr-callable but is available to the C code.
* This patch currently does not handle large objects in tablesync.
---
 src/backend/libpq/be-fsstubs.c           | 24 +++++++++++++++---------
 src/backend/replication/logical/worker.c | 21 +++++++++++++++++++++
 src/include/libpq/be-fsstubs.h           |  1 +
 3 files changed, 37 insertions(+), 9 deletions(-)

diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c
index f27e374c4ee..ffabad9b708 100644
--- a/src/backend/libpq/be-fsstubs.c
+++ b/src/backend/libpq/be-fsstubs.c
@@ -202,6 +202,20 @@ lo_write(int fd, const char *buf, int len)
 	return status;
 }
 
+void
+lo_put(Oid loOid, int64 offset, const char *str, int len)
+{
+	LargeObjectDesc *loDesc;
+	int			written PG_USED_FOR_ASSERTS_ONLY;
+
+	lo_cleanup_needed = true;
+	loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext);
+	inv_seek(loDesc, offset, SEEK_SET);
+	written = inv_write(loDesc, str, len);
+	Assert(written == len);
+	inv_close(loDesc);
+}
+
 Datum
 be_lo_lseek(PG_FUNCTION_ARGS)
 {
@@ -857,17 +871,9 @@ be_lo_put(PG_FUNCTION_ARGS)
 	Oid			loOid = PG_GETARG_OID(0);
 	int64		offset = PG_GETARG_INT64(1);
 	bytea	   *str = PG_GETARG_BYTEA_PP(2);
-	LargeObjectDesc *loDesc;
-	int			written PG_USED_FOR_ASSERTS_ONLY;
 
 	PreventCommandIfReadOnly("lo_put()");
 
-	lo_cleanup_needed = true;
-	loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext);
-	inv_seek(loDesc, offset, SEEK_SET);
-	written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str));
-	Assert(written == VARSIZE_ANY_EXHDR(str));
-	inv_close(loDesc);
-
+	lo_put(loOid, offset, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str));
 	PG_RETURN_VOID();
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 32725c48623..88701da7717 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -261,6 +261,7 @@
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/execPartition.h"
+#include "libpq/be-fsstubs.h"
 #include "libpq/pqformat.h"
 #include "miscadmin.h"
 #include "optimizer/optimizer.h"
@@ -3163,6 +3164,23 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EvalPlanQualEnd(&epqstate);
 }
 
+static void
+apply_handle_lowrite(StringInfo s)
+{
+	Oid			loid;
+	int64		offset;
+	Size		datalen;
+	char	   *data;
+
+	if (handle_streamed_transaction(LOGICAL_REP_MSG_LOWRITE, s))
+		return;
+
+	begin_replication_step();
+	logicalrep_read_lo_write(s, &loid, &offset, &datalen, &data);
+	lo_put(loid, offset, data, datalen);
+	end_replication_step();
+}
+
 /*
  * Try to find a tuple received from the publication side (in 'remoteslot') in
  * the corresponding local relation using either replica identity index,
@@ -3868,6 +3886,9 @@ apply_dispatch(StringInfo s)
 			apply_handle_stream_prepare(s);
 			break;
 
+		case LOGICAL_REP_MSG_LOWRITE:
+			apply_handle_lowrite(s);
+			break;
 		default:
 			ereport(ERROR,
 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
diff --git a/src/include/libpq/be-fsstubs.h b/src/include/libpq/be-fsstubs.h
index 8775939f410..39e4e751b45 100644
--- a/src/include/libpq/be-fsstubs.h
+++ b/src/include/libpq/be-fsstubs.h
@@ -21,6 +21,7 @@
  */
 extern int	lo_read(int fd, char *buf, int len);
 extern int	lo_write(int fd, const char *buf, int len);
+extern void lo_put(Oid loOid, int64 offset, const char *str, int len);
 
 /*
  * Cleanup LOs at xact commit/abort
-- 
2.53.0.rc1.225.gd81095ad13-goog

From a9971b7d0c0e49d3b34e260521768bb8c6fe7ee4 Mon Sep 17 00:00:00 2001
From: Dilip Kumar <[email protected]>
Date: Mon, 29 Dec 2025 09:37:52 +0000
Subject: [PATCH v3 1/4] Support large object decoding

Introduce support for decoding changes to large objects in
logical replication.  Changes to 'pg_largeobject' are now
intercepted in 'heap_decode' based on LargeObjectRelationId.
Since a single large object operation (LO_WRITE) spans multiple
physical rows in 'pg_largeobject', the changes are decoded and
converted into a dedicated logical operation:
REORDER_BUFFER_CHANGE_LOWRITE.
---
 contrib/test_decoding/Makefile                |   2 +-
 .../expected/decoding_largeobject.out         | 216 ++++++++++++++++++
 .../sql/decoding_largeobject.sql              |  94 ++++++++
 contrib/test_decoding/test_decoding.c         |  39 ++++
 src/backend/replication/logical/decode.c      | 133 +++++++++++
 src/backend/replication/logical/proto.c       |  41 ++++
 .../replication/logical/reorderbuffer.c       |  61 +++++
 src/backend/replication/pgoutput/pgoutput.c   |  23 ++
 src/include/replication/logicalproto.h        |   5 +
 src/include/replication/reorderbuffer.h       |  12 +
 src/include/utils/rel.h                       |   6 +-
 11 files changed, 630 insertions(+), 2 deletions(-)
 create mode 100644 contrib/test_decoding/expected/decoding_largeobject.out
 create mode 100644 contrib/test_decoding/sql/decoding_largeobject.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index acbcaed2feb..d1f02500cf3 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
 	decoding_into_rel binary prepared replorigin time messages \
-	spill slot truncate stream stats twophase twophase_stream
+	spill slot truncate stream stats twophase twophase_stream decoding_largeobject
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
 	oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
 	twophase_snapshot slot_creation_error catalog_change_snapshot \
diff --git a/contrib/test_decoding/expected/decoding_largeobject.out b/contrib/test_decoding/expected/decoding_largeobject.out
new file mode 100644
index 00000000000..a2720d82064
--- /dev/null
+++ b/contrib/test_decoding/expected/decoding_largeobject.out
@@ -0,0 +1,216 @@
+-- test that we can insert into the large objects and decode the changes
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+-- Create a new large object
+CREATE TABLE lotest_stash_values (loid oid, fd integer);
+INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42);
+-- NOTE: large objects require transactions
+BEGIN;
+UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values;
+ lowrite 
+---------
+      22
+(1 row)
+
+SELECT lo_close(fd) FROM lotest_stash_values;
+ lo_close 
+----------
+        0
+(1 row)
+
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                    data                                    
+----------------------------------------------------------------------------
+ BEGIN
+ table public.lotest_stash_values: INSERT: loid[oid]:16970 fd[integer]:null
+ COMMIT
+ BEGIN
+ table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0
+ LO_WRITE: loid: 16970 offset: 0 datalen: 22 data: large object test data
+ COMMIT
+(7 rows)
+
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values;
+ lo_lseek 
+----------
+       10
+(1 row)
+
+SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values;
+ lowrite 
+---------
+      19
+(1 row)
+
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                      data                                       
+---------------------------------------------------------------------------------
+ BEGIN
+ table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0
+ LO_WRITE: loid: 16970 offset: 0 datalen: 29 data: large objeoverwrite some data
+ COMMIT
+(4 rows)
+
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values;
+ lo_lseek 
+----------
+     2048
+(1 row)
+
+SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values;
+ lowrite 
+---------
+      19
+(1 row)
+
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                   data                                   
+--------------------------------------------------------------------------
+ BEGIN
+ table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0
+ LO_WRITE: loid: 16970 offset: 2048 datalen: 19 data: write into new page
+ COMMIT
+(4 rows)
+
+BEGIN;
+UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lowrite(fd, '
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+') FROM lotest_stash_values;
+ lowrite 
+---------
+    3829
+(1 row)
+
+SELECT lo_close(fd) FROM lotest_stash_values;
+ lo_close 
+----------
+        0
+(1 row)
+
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+                                              data                                               
+-------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0
+ LO_WRITE: loid: 16970 offset: 0 datalen: 2048 data:                                            +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data m
+ LO_WRITE: loid: 16970 offset: 2048 datalen: 1781 data: ore in 2048 test large data more in 2048+
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ test large data more in 2048 test large data more in 2048 test large data more in 2048         +
+ 
+ COMMIT
+(5 rows)
+
+-- Clean up the slot
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/decoding_largeobject.sql b/contrib/test_decoding/sql/decoding_largeobject.sql
new file mode 100644
index 00000000000..ff392de2fba
--- /dev/null
+++ b/contrib/test_decoding/sql/decoding_largeobject.sql
@@ -0,0 +1,94 @@
+-- test that we can insert into the large objects and decode the changes
+
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- slot works
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- Create a new large object
+CREATE TABLE lotest_stash_values (loid oid, fd integer);
+
+INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42);
+
+-- NOTE: large objects require transactions
+BEGIN;
+UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values;
+SELECT lo_close(fd) FROM lotest_stash_values;
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values;
+SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values;
+END;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+BEGIN;
+UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values;
+SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values;
+END;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+BEGIN;
+UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer));
+SELECT lowrite(fd, '
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+test large data more in 2048 test large data more in 2048 test large data more in 2048
+') FROM lotest_stash_values;
+SELECT lo_close(fd) FROM lotest_stash_values;
+END;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+
+-- Clean up the slot
+SELECT pg_drop_replication_slot('regression_slot');
\ No newline at end of file
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index d5cf0fa02b0..be3f9c860dd 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -470,6 +470,38 @@ pg_decode_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+static void
+pg_decode_lo_write(LogicalDecodingContext *ctx,
+				   ReorderBufferChange *change)
+{
+	TestDecodingData   *data;
+	MemoryContext		old;
+	Oid					loid = change->data.lo_write.loid;
+	int64				offset = change->data.lo_write.offset;
+	Size				datalen = change->data.lo_write.datalen;
+	char			   *lodata = change->data.lo_write.data;
+
+	data = ctx->output_plugin_private;
+
+	/* Avoid leaking memory by using and resetting our own context */
+	old = MemoryContextSwitchTo(data->context);
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "LO_WRITE:");
+	appendStringInfo(ctx->out, " loid: %u offset: " INT64_FORMAT " datalen: %zu data: ",
+					 loid, offset, datalen);
+
+	appendBinaryStringInfo(ctx->out, lodata, datalen);
+
+	/* For test_decoding, we print the data length but typically skip the binary data itself */
+
+	MemoryContextSwitchTo(old);
+	MemoryContextReset(data->context);
+
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Print literal `outputstr' already represented as string of type `typid'
  * into stringbuf `s'.
@@ -619,6 +651,13 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	}
 	txndata->xact_wrote_changes = true;
 
+	/* Handle large object changes independent of the table changes. */
+	if (change->action == REORDER_BUFFER_CHANGE_LOWRITE)
+	{
+		pg_decode_lo_write(ctx, change);
+		return;
+	}
+
 	class_form = RelationGetForm(relation);
 	tupdesc = RelationGetDescr(relation);
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 32af1249610..2af7fb5fc88 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -26,6 +26,7 @@
  */
 #include "postgres.h"
 
+#include "access/detoast.h"
 #include "access/heapam_xlog.h"
 #include "access/transam.h"
 #include "access/xact.h"
@@ -33,11 +34,13 @@
 #include "access/xlogreader.h"
 #include "access/xlogrecord.h"
 #include "catalog/pg_control.h"
+#include "catalog/pg_largeobject.h"
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/message.h"
 #include "replication/reorderbuffer.h"
 #include "replication/snapbuild.h"
+#include "storage/large_object.h"
 #include "storage/standbydefs.h"
 
 /* individual record(group)'s handlers */
@@ -56,6 +59,10 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						bool two_phase);
 static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 						  xl_xact_parsed_prepare *parsed);
+static void DecodeLargeObjectInsert(LogicalDecodingContext *ctx,
+									XLogRecordBuffer *buf);
+static void DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx,
+									 XLogRecordBuffer *buf);
 
 
 /* common function to decode tuples */
@@ -466,6 +473,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	uint8		info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
 	TransactionId xid = XLogRecGetXid(buf->record);
 	SnapBuild  *builder = ctx->snapshot_builder;
+	RelFileLocator target_locator;
+	XLogReaderState *r = buf->record;
 
 	ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
 
@@ -480,6 +489,22 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
 		return;
 
+	XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL);
+
+	/*
+	 * Check if the WAL record pertains to 'pg_largeobject'. If it does,
+	 * handle the large object changes separately via
+	 * DecodeLargeObjectChanges, bypassing the standard heap table decoding
+	 * logic that follows.
+	 */
+	if (target_locator.relNumber == LargeObjectRelationId)
+	{
+		if (SnapBuildProcessChange(builder, xid, buf->origptr) &&
+			!ctx->fast_forward)
+			DecodeLargeObjectChanges(info, ctx, buf);
+		return;
+	}
+
 	switch (info)
 	{
 		case XLOG_HEAP_INSERT:
@@ -1309,3 +1334,111 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 	return false;
 }
+
+/*
+ * Helper function to decode a 'pg_largeobject' INSERT record into a
+ * 'REORDER_BUFFER_CHANGE_LOWRITE' change.
+ *
+ * Each row in 'pg_largeobject' represents only a small page (or chunk) of
+ * a large object's data. Logically, these individual page-level inserts
+ * are not meaningful on their own to a consumer. Therefore, instead of
+ * treating them as regular heap tuple changes, we convert the physical
+ * page insert into a single, more meaningful logical operation: a
+ * 'LO_WRITE' change, which can be applied as an independent large object
+ * operation.
+ */
+static void
+DecodeLargeObjectInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	XLogReaderState *r = buf->record;
+	ReorderBufferChange *change;
+	Size		datalen;
+	char		*tupledata;
+	HeapTuple	tuple;
+	bytea	   *data_chunk;
+	Oid			loid;
+	int32 		pageno;
+	int64		offset;
+	Size		chunk_datalen;
+	char	   *data_copy;
+	bool		freeit = false;
+	Form_pg_largeobject	largeobject;
+
+	tupledata = XLogRecGetBlockData(r, 0, &datalen);
+	if (datalen == 0)
+		return;
+
+	tuple = ReorderBufferAllocTupleBuf(ctx->reorder, datalen - SizeOfHeapHeader);
+	DecodeXLogTuple(tupledata, datalen, tuple);
+	largeobject = GETSTRUCT(tuple);
+
+	/* Fetch loid, pageno and actual data from the pg_largeobject tuple. */
+	loid = largeobject->loid;
+	pageno = largeobject->pageno;
+	data_chunk = &(largeobject->data);
+	if (VARATT_IS_EXTENDED(data_chunk))
+	{
+		data_chunk = (bytea *)
+			detoast_attr((struct varlena *) data_chunk);
+		freeit = true;
+	}
+	chunk_datalen = VARSIZE(data_chunk) - VARHDRSZ;
+
+	/*
+	 * Convert the single 'pg_largeobject' row (which represents a data page)
+	 * into a logical 'LOWRITE' operation. The absolute offset for this write
+	 * is computed by multiplying the page number ('pageno') by the fixed
+	 * large object block size (LOBLKSIZE).
+	 */
+	offset = (int64) pageno * LOBLKSIZE;
+	//chunk_datalen = VARSIZE_ANY_EXHDR(data_chunk);
+	data_copy = ReorderBufferAllocRawBuffer(ctx->reorder, chunk_datalen);
+	memcpy(data_copy, VARDATA(data_chunk), chunk_datalen);
+
+
+	/* Create the LOWRITE change */
+	change = ReorderBufferAllocChange(ctx->reorder);
+	change->action = REORDER_BUFFER_CHANGE_LOWRITE;
+	change->origin_id = XLogRecGetOrigin(r);
+
+	change->data.lo_write.loid = loid;
+	change->data.lo_write.offset = offset;
+	change->data.lo_write.datalen = chunk_datalen;
+	change->data.lo_write.data = data_copy;
+
+	/* Enqueue the change */
+	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+							 change, false);
+	ReorderBufferFreeTupleBuf(tuple);
+	if (freeit)
+		pfree(data_chunk);
+}
+
+/*
+ * Processes and decodes all logical changes for large objects (LOs).
+ * Since LO data is spread across 'pg_largeobject' rows, this function
+ * maps physical changes (INSERT/UPDATE) to a single logical 'LO_WRITE'
+ * operation.
+ *
+ * TODO: Temporarily ignoring LO_UNLINK (DELETE), which will be
+ * handled during a later phase.
+ */
+static void
+DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx,
+						 XLogRecordBuffer *buf)
+{
+	switch (info)
+	{
+		case XLOG_HEAP_INSERT:
+		case XLOG_HEAP_HOT_UPDATE:
+		case XLOG_HEAP_UPDATE:
+			DecodeLargeObjectInsert(ctx, buf);
+			break;
+		case XLOG_HEAP_DELETE:
+			/* LO_UNLINK (delete) is handled in a later phase */
+			break;
+		default:
+			/* Ignore other operations on pg_largeobject for now */
+			break;
+	}
+}
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 3950dd0cf46..bfc8678e3df 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -346,6 +346,45 @@ logicalrep_read_rollback_prepared(StringInfo in,
 	strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid));
 }
 
+void
+logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid,
+						  int64 offset, Size datalen, const char *data)
+{
+	pq_sendbyte(out, LOGICAL_REP_MSG_LOWRITE);
+
+	/* Write LO ID, offset, and data length */
+	pq_sendint32(out, loid);
+	pq_sendint64(out, offset);
+	pq_sendint32(out, datalen);
+
+	/* Write the data chunk */
+	pq_sendbytes(out, data, datalen);
+}
+
+void
+logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen,
+						 char **data)
+{
+	/* Read fields, incorporating validation */
+	*loid = pq_getmsgint(s, 4);
+	if (!OidIsValid(*loid))
+		elog(ERROR, "large object ID is not set in LO write message");
+
+	*offset = pq_getmsgint64(s);
+	if (*offset < 0)
+		elog(ERROR, "invalid offset " INT64_FORMAT " in LO write message", *offset);
+
+	*datalen = pq_getmsgint(s, 4);
+	if (*datalen < 0)
+		elog(ERROR, "invalid data length %zu in LO write message", *datalen);
+
+	/* Allocate memory for the data payload */
+	*data = palloc(*datalen);
+
+	/* Read the data payload directly into the new buffer */
+	pq_copymsgbytes(s, *data, *datalen);
+}
+
 /*
  * Write STREAM PREPARE to the output stream.
  */
@@ -1235,6 +1274,8 @@ logicalrep_message_type(LogicalRepMsgType action)
 			return "TYPE";
 		case LOGICAL_REP_MSG_MESSAGE:
 			return "MESSAGE";
+		case LOGICAL_REP_MSG_LOWRITE:
+			return "LOWRITE";
 		case LOGICAL_REP_MSG_BEGIN_PREPARE:
 			return "BEGIN PREPARE";
 		case LOGICAL_REP_MSG_PREPARE:
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 2d2a6d5e9e7..a6d672ab19c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -579,6 +579,13 @@ ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change,
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
+		case REORDER_BUFFER_CHANGE_LOWRITE:
+			if (change->data.lo_write.data != NULL)
+			{
+				pfree(change->data.lo_write.data);
+				change->data.lo_write.data = NULL;
+			}
+			break;
 	}
 
 	pfree(change);
@@ -930,6 +937,19 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 	}
 }
 
+/*
+ * Allocate a raw memory from reorder buffer.
+ */
+void *
+ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len)
+{
+	void *buffer;
+
+	buffer = (char *) MemoryContextAlloc(rb->tup_context, alloc_len);
+
+	return buffer;
+}
+
 /*
  * AssertTXNLsnOrder
  *		Verify LSN ordering of transaction lists in the reorderbuffer
@@ -2585,6 +2605,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 					elog(ERROR, "tuplecid value in changequeue");
 					break;
+
+				case REORDER_BUFFER_CHANGE_LOWRITE:
+					ReorderBufferApplyChange(rb, txn, NULL, change, streaming);
+					break;
 			}
 
 			/*
@@ -4267,6 +4291,26 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			/* ReorderBufferChange contains everything important */
 			break;
+		case REORDER_BUFFER_CHANGE_LOWRITE:
+			{
+				char   *data;
+				Size	datalen = change->data.lo_write.datalen;
+
+				sz += datalen;
+
+				/* make sure we have enough space */
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+
+				/* might have been reallocated above */
+				ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+				/* Copy the LO_WRITE struct and the data payload immediately following it */
+				memcpy(data, &change->data.lo_write.data, datalen);
+
+				break;
+			}
 	}
 
 	ondisk->size = sz;
@@ -4531,6 +4575,11 @@ ReorderBufferChangeSize(ReorderBufferChange *change)
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			/* ReorderBufferChange contains everything important */
 			break;
+		case REORDER_BUFFER_CHANGE_LOWRITE:
+			{
+				sz += change->data.lo_write.datalen;
+				break;
+			}
 	}
 
 	return sz;
@@ -4830,6 +4879,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
 		case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
 			break;
+		case REORDER_BUFFER_CHANGE_LOWRITE:
+			{
+				Size        datalen = change->data.lo_write.datalen;
+
+				/* Allocate memory for the data payload */
+				change->data.lo_write.data = MemoryContextAlloc(rb->context, datalen);
+
+				/* Copy the data payload */
+				memcpy(change->data.lo_write.data, data, datalen);
+
+				break;
+			}
 	}
 
 	dlist_push_tail(&txn->changes, &change->node);
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e016f64e0b3..5eb5a0fa86d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -1495,6 +1495,29 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	TupleTableSlot *old_slot = NULL;
 	TupleTableSlot *new_slot = NULL;
 
+	if (change->action == REORDER_BUFFER_CHANGE_LOWRITE)
+	{
+		if (txndata && !txndata->sent_begin_txn)
+			pgoutput_send_begin(ctx, txn);
+
+		/* Remember the xid for the change in streaming mode. */
+		if (data->in_streaming)
+			xid = change->txn->xid;
+
+		OutputPluginPrepareWrite(ctx, true);
+
+		/* Use the new helper to serialize the LO payload */
+		logicalrep_write_lo_write(ctx->out, xid,
+								  change->data.lo_write.loid,
+								  change->data.lo_write.offset,
+								  change->data.lo_write.datalen,
+								  change->data.lo_write.data);
+
+		OutputPluginWrite(ctx, true);
+
+		return;
+	}
+
 	if (!is_publishable_relation(relation))
 		return;
 
diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h
index 058a955e20c..ba65ee406be 100644
--- a/src/include/replication/logicalproto.h
+++ b/src/include/replication/logicalproto.h
@@ -66,6 +66,7 @@ typedef enum LogicalRepMsgType
 	LOGICAL_REP_MSG_RELATION = 'R',
 	LOGICAL_REP_MSG_TYPE = 'Y',
 	LOGICAL_REP_MSG_MESSAGE = 'M',
+	LOGICAL_REP_MSG_LOWRITE = 'W',
 	LOGICAL_REP_MSG_BEGIN_PREPARE = 'b',
 	LOGICAL_REP_MSG_PREPARE = 'P',
 	LOGICAL_REP_MSG_COMMIT_PREPARED = 'K',
@@ -214,6 +215,10 @@ extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN
 											   TimestampTz prepare_time);
 extern void logicalrep_read_rollback_prepared(StringInfo in,
 											  LogicalRepRollbackPreparedTxnData *rollback_data);
+extern void logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid,
+									  int64 offset, Size datalen, const char *data);
+extern void logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen,
+									 char **data);
 extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn,
 											XLogRecPtr prepare_lsn);
 extern void logicalrep_read_stream_prepare(StringInfo in,
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 2d717a9e152..eb9b150e05f 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -61,6 +61,7 @@ typedef enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM,
 	REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT,
 	REORDER_BUFFER_CHANGE_TRUNCATE,
+	REORDER_BUFFER_CHANGE_LOWRITE,
 } ReorderBufferChangeType;
 
 /* forward declaration */
@@ -154,6 +155,16 @@ typedef struct ReorderBufferChange
 			uint32		ninvalidations; /* Number of messages */
 			SharedInvalidationMessage *invalidations;	/* invalidation message */
 		}			inval;
+
+		/* Lo write */
+		struct
+		{
+			Oid		loid;
+			int64	offset;
+			Size	datalen;
+			char   *data;
+		}			lo_write;
+
 	}			data;
 
 	/*
@@ -722,6 +733,7 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
 									  Snapshot snap, XLogRecPtr lsn,
 									  bool transactional, const char *prefix,
 									  Size message_size, const char *message);
+extern void *ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len);
 extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 								XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 								TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn);
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 236830f6b93..ff68c38f75c 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -19,6 +19,7 @@
 #include "catalog/catalog.h"
 #include "catalog/pg_class.h"
 #include "catalog/pg_index.h"
+#include "catalog/pg_largeobject.h"
 #include "catalog/pg_publication.h"
 #include "nodes/bitmapset.h"
 #include "partitioning/partdefs.h"
@@ -706,12 +707,15 @@ RelationCloseSmgr(Relation relation)
  * it would complicate decoding slightly for little gain). Note that we *do*
  * log information for user defined catalog tables since they presumably are
  * interesting to the user...
+ *
+ * TODO: Logically log pg_largeobject rows with a configuration parameter
+ * instead of doing it unconditionally.
  */
 #define RelationIsLogicallyLogged(relation) \
 	(XLogLogicalInfoActive() && \
 	 RelationNeedsWAL(relation) && \
 	 (relation)->rd_rel->relkind != RELKIND_FOREIGN_TABLE &&	\
-	 !IsCatalogRelation(relation))
+	 !(IsCatalogRelation(relation) && RelationGetRelid(relation) != LargeObjectRelationId))
 
 /* routines in utils/cache/relcache.c */
 extern void RelationIncrementReferenceCount(Relation rel);
-- 
2.53.0.rc1.225.gd81095ad13-goog

From 413d02e042692999b0239c8039a95811491ceb77 Mon Sep 17 00:00:00 2001
From: Nitin Motiani <[email protected]>
Date: Mon, 2 Feb 2026 09:05:51 +0000
Subject: [PATCH v3 4/4] Enable tablesync for large objects for all users

* Instead of trying to copy pg_largeobject entries, we utilize lo_from_bytea to write the entries.

* The use of lo_from_bytea allows us to create the large object if it doesn't already exist.
  This way we can do the tablesync just using one worker instead of running a separate worker for
  pg_largeobject_metadata. Running one worker helps us avoid any synchronization issues which
  would come up with a separate sync worker creating the large objects and a separate one writing to them.

* This also allows the subscription owner to only replicate the large objects its counterpart owns on the
  publisher. This way multiple subscriptions can be created which only take care of only their own large objects.

* The above logic can also be added to the apply side of the code to only replicate the large
  objects owned by the subscription owner.
---
 src/backend/libpq/be-fsstubs.c              | 24 +++++++----
 src/backend/replication/logical/tablesync.c | 47 ++++++++++++++++++++-
 src/include/libpq/be-fsstubs.h              |  1 +
 3 files changed, 62 insertions(+), 10 deletions(-)

diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c
index ffabad9b708..9d3fbeb809e 100644
--- a/src/backend/libpq/be-fsstubs.c
+++ b/src/backend/libpq/be-fsstubs.c
@@ -216,6 +216,20 @@ lo_put(Oid loOid, int64 offset, const char *str, int len)
 	inv_close(loDesc);
 }
 
+void
+lo_from_bytea(Oid loOid, bytea *str)
+{
+	LargeObjectDesc *loDesc;
+	int			written PG_USED_FOR_ASSERTS_ONLY;
+
+	lo_cleanup_needed = true;
+	loOid = inv_create(loOid);
+	loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext);
+	written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str));
+	Assert(written == VARSIZE_ANY_EXHDR(str));
+	inv_close(loDesc);
+}
+
 Datum
 be_lo_lseek(PG_FUNCTION_ARGS)
 {
@@ -847,18 +861,10 @@ be_lo_from_bytea(PG_FUNCTION_ARGS)
 {
 	Oid			loOid = PG_GETARG_OID(0);
 	bytea	   *str = PG_GETARG_BYTEA_PP(1);
-	LargeObjectDesc *loDesc;
-	int			written PG_USED_FOR_ASSERTS_ONLY;
 
 	PreventCommandIfReadOnly("lo_from_bytea()");
 
-	lo_cleanup_needed = true;
-	loOid = inv_create(loOid);
-	loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext);
-	written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str));
-	Assert(written == VARSIZE_ANY_EXHDR(str));
-	inv_close(loDesc);
-
+	lo_from_bytea(loOid, str);
 	PG_RETURN_OID(loOid);
 }
 
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 19a3c21a863..62ea0ee416c 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -100,6 +100,7 @@
 #include "catalog/pg_subscription_rel.h"
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
+#include "libpq/be-fsstubs.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
 #include "parser/parse_relation.h"
@@ -1406,6 +1407,51 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 
 	replorigin_session_setup(originid, 0);
 	replorigin_xact_state.origin = originid;
+	if (rel->rd_id == LargeObjectRelationId)
+	{
+		TupleTableSlot *slot;
+		Oid			tableRow[2] = {OIDOID, BYTEAOID};
+
+		PushActiveSnapshot(GetTransactionSnapshot());
+		res = walrcv_exec(LogRepWorkerWalRcvConn, "SELECT m.oid as oid, lo_get(m.oid) FROM pg_largeobject_metadata as m JOIN pg_user as u ON m.lomowner = u.usesysid where u.usename=CURRENT_USER", 2, tableRow);
+		if (res->status != WALRCV_OK_TUPLES)
+			ereport(ERROR,
+					errmsg("could not run large objects command."));
+		slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+		while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+		{
+			Oid			looid;
+			bytea	   *lodata;
+			bool		isnull;
+
+			looid = DatumGetObjectId(slot_getattr(slot, 1, &isnull));
+			Assert(!isnull);
+			lodata = (bytea *) (slot_getattr(slot, 2, &isnull));
+			lo_from_bytea(looid, lodata);
+			ExecClearTuple(slot);
+		}
+		ExecDropSingleTupleTableSlot(slot);
+		walrcv_clear_result(res);
+		PopActiveSnapshot();
+
+		res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
+		if (res->status != WALRCV_OK_COMMAND)
+			ereport(ERROR,
+					(errcode(ERRCODE_CONNECTION_FAILURE),
+					 errmsg("table copy could not finish transaction on publisher: %s",
+							res->err)));
+
+		walrcv_clear_result(res);
+		table_close(rel, NoLock);
+		CommandCounterIncrement();
+		UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
+								   MyLogicalRepWorker->relid,
+								   SUBREL_STATE_FINISHEDCOPY,
+								   MyLogicalRepWorker->relstate_lsn,
+								   false);
+		CommitTransactionCommand();
+		goto copy_table_done;
+	}
 
 	/*
 	 * If the user did not opt to run as the owner of the subscription
@@ -1474,7 +1520,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
 	CommitTransactionCommand();
 
 copy_table_done:
-
 	elog(DEBUG1,
 		 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
 		 originname, LSN_FORMAT_ARGS(*origin_startpos));
diff --git a/src/include/libpq/be-fsstubs.h b/src/include/libpq/be-fsstubs.h
index 39e4e751b45..11c1e29efbc 100644
--- a/src/include/libpq/be-fsstubs.h
+++ b/src/include/libpq/be-fsstubs.h
@@ -22,6 +22,7 @@
 extern int	lo_read(int fd, char *buf, int len);
 extern int	lo_write(int fd, const char *buf, int len);
 extern void lo_put(Oid loOid, int64 offset, const char *str, int len);
+extern void lo_from_bytea(Oid loOid, bytea *str);
 
 /*
  * Cleanup LOs at xact commit/abort
-- 
2.53.0.rc1.225.gd81095ad13-goog

Reply via email to