Hi Dilip, I have created a couple of POC patches for the tablesync side of this. The challenge with the tablesync involved permissions because the table is owned by the superuser. I considered a few solutions and have patches for a couple.
1. One simple solution is to allow only the superuser to set up large object replication. This can be achieved with a command to enable large object replication to a publication. Or maybe a guc flag can be used. Ideally we don't want all publications to publish the large objects here. So a publication level command might be preferred. While the enablement mechanism is in question, I have implemented the POC by adding the pg_largeobject table to the result of pg_get_publication_tables. In the final implementation, we might also add the pg_largeobject_metadata table. The patch is attached in POC-v3-0003-Tablesync-for-large-objects-for-superuser.patch. An alternative would be to handle this special case on the subscriber side. But it is simpler to do it on the publisher side. Alternatively we might need the setup on both the publisher and subscriber sides. For the full enablement we'll need this patch, the existing apply worker patch for lo_write, and a patch supporting lo_create on the apply worker side. 2. An alternative solution would be to allow a subscription owner to only copy the large object entries owned by its counterpart on the publisher. This allows all users to replicate large objects. It also enables multiple subscriptions to setup large object replication. The enablement question from the first approach still remains open. We can also assume that a patch supporting lo_create on the apply worker will be created along with this change. For implementation, we can have two tablesync workers - one for pg_largeobject_metadata and one for pg_largeobject. The pg_largeobject_metadata worker can use lo_create to create the largeobject entries. Alternatively it can just copy the items since permissions are not an issue for this table. The pg_largeobject worker will use lo_write/lo_put to insert the entries owned by it. The major challenge with this approach is the synchronization required between the pg_largeobject and pg_largeobject_metadata tablesync workers. Usually for a table copy (as suggested for superusers in the first approach), integrity checks are turned off and we should have no issue with the two tablesync workers running in parallel. However, explicitly using lo_write will check for the large object's existence, which can lead to failures. 3. To avoid the synchronization problem above, I experimented with a modified approach. Instead of having a separate tablesync worker for pg_largeobject_metadata, I only created one tablesync worker for pg_largeobject. This differs from a regular tablesync worker in the following ways : a. Instead of a copy command, it runs "select m.oid, lo_get(m.oid) from pg_largeobject_metadata as m join pg_user as u on m.lomowner = u.usesysid where u.usename = CURRENT_USER;" This returns all largeobjects owned by the user. b. Then I use lo_from_bytea (a refactored version which doesn't need fmgr) to both create and write the large object which removes the need for a separate lo_create. POC-v3-0004-Enable-tablesync-for-large-objects-for-all-users.patch contains this implementation. I tested it successfully for a small set of largeobjects. This patch, along with support for lo_create in applyworker should provide full support for large objects replication. Note : We should also modify the applyWorker code to replicate only the lo_writes on objects owned by the subscription user. I have not made that change in the POC but can do it in the next version. One major concern here is the performance of tablesync. I think that most of the users will set up replication at the start time. Or convert a physical replica to a logical replica. So this cost might not be borne in many cases. 4. If we want a more performant version, one idea is to support bulk writes for large objects. Then the above solution can be made more performant. I have not analyzed the work required. Suggestions on this would be welcome. Thanks. Nitin Motiani Google
From 678ce86f4294e0c6bce9f67cc00437c045b9dc43 Mon Sep 17 00:00:00 2001 From: Nitin Motiani <[email protected]> Date: Mon, 2 Feb 2026 09:00:36 +0000 Subject: [PATCH v3 3/4] Tablesync for large objects for superuser * Add the large object table to the output of pg_get_publication_tables. * This approach ensures that it is handled without any special logic on the subscriber side. * For POC, we have only added pg_largeobject. We can do the same for pg_largeobject_metadata. * Due to the permissions around pg_largeobject, this will only work for superusers. * In the final implementation this behaviour will be controlled by a flag or some other method which enables the large objects replication. For POC, we have done it for all publications. --- src/backend/catalog/pg_publication.c | 8 ++++++++ src/backend/commands/subscriptioncmds.c | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 9a4791c573e..f68fef6c4f3 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -1186,6 +1186,14 @@ pg_get_publication_tables(PG_FUNCTION_ARGS) pub_elem_tables = list_concat_unique_oid(relids, schemarelids); } + /* + * TODO : For poc, just add pg_largeobject relid to the + * publication. We need to figure out the long term approach to do + * this and also decide which publication would publish the large + * objects. + */ + pub_elem_tables = lappend_oid(pub_elem_tables, LargeObjectRelationId); + /* * Record the published table and the corresponding publication so * that we can get row filters and column lists later. diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 0b3c8499b49..3014b66ec91 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -2987,6 +2987,15 @@ fetch_relation_list(WalReceiverConn *wrconn, List *publications) walrcv_clear_result(res); + /* + * TODO : Might need a check or special code here to include the + * largeobject tables. In the POC, we are chaning + * pg_get_publication_tables. But that might change. We also might want to + * have a combination of both - the publication provides the large object + * tables and the subscription has a flag to decide if it wants to + * replicate large objects or not. + */ + return relationlist; } -- 2.53.0.rc1.225.gd81095ad13-goog
From e354f683c01a03a0fb4d309f8b66f2228aa7091b Mon Sep 17 00:00:00 2001 From: Nitin Motiani <[email protected]> Date: Wed, 31 Dec 2025 09:09:20 +0000 Subject: [PATCH v3 2/4] Add support to apply lo_write operations in applyworker * Handle REORDER_BUFFER_CHANGE_LOWRITE operations on apply worker. * This is done by using lo_put. This patch refactors lo_put to provide a version which is not fmgr-callable but is available to the C code. * This patch currently does not handle large objects in tablesync. --- src/backend/libpq/be-fsstubs.c | 24 +++++++++++++++--------- src/backend/replication/logical/worker.c | 21 +++++++++++++++++++++ src/include/libpq/be-fsstubs.h | 1 + 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c index f27e374c4ee..ffabad9b708 100644 --- a/src/backend/libpq/be-fsstubs.c +++ b/src/backend/libpq/be-fsstubs.c @@ -202,6 +202,20 @@ lo_write(int fd, const char *buf, int len) return status; } +void +lo_put(Oid loOid, int64 offset, const char *str, int len) +{ + LargeObjectDesc *loDesc; + int written PG_USED_FOR_ASSERTS_ONLY; + + lo_cleanup_needed = true; + loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext); + inv_seek(loDesc, offset, SEEK_SET); + written = inv_write(loDesc, str, len); + Assert(written == len); + inv_close(loDesc); +} + Datum be_lo_lseek(PG_FUNCTION_ARGS) { @@ -857,17 +871,9 @@ be_lo_put(PG_FUNCTION_ARGS) Oid loOid = PG_GETARG_OID(0); int64 offset = PG_GETARG_INT64(1); bytea *str = PG_GETARG_BYTEA_PP(2); - LargeObjectDesc *loDesc; - int written PG_USED_FOR_ASSERTS_ONLY; PreventCommandIfReadOnly("lo_put()"); - lo_cleanup_needed = true; - loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext); - inv_seek(loDesc, offset, SEEK_SET); - written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str)); - Assert(written == VARSIZE_ANY_EXHDR(str)); - inv_close(loDesc); - + lo_put(loOid, offset, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str)); PG_RETURN_VOID(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 32725c48623..88701da7717 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -261,6 +261,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/execPartition.h" +#include "libpq/be-fsstubs.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "optimizer/optimizer.h" @@ -3163,6 +3164,23 @@ apply_handle_delete_internal(ApplyExecutionData *edata, EvalPlanQualEnd(&epqstate); } +static void +apply_handle_lowrite(StringInfo s) +{ + Oid loid; + int64 offset; + Size datalen; + char *data; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_LOWRITE, s)) + return; + + begin_replication_step(); + logicalrep_read_lo_write(s, &loid, &offset, &datalen, &data); + lo_put(loid, offset, data, datalen); + end_replication_step(); +} + /* * Try to find a tuple received from the publication side (in 'remoteslot') in * the corresponding local relation using either replica identity index, @@ -3868,6 +3886,9 @@ apply_dispatch(StringInfo s) apply_handle_stream_prepare(s); break; + case LOGICAL_REP_MSG_LOWRITE: + apply_handle_lowrite(s); + break; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/include/libpq/be-fsstubs.h b/src/include/libpq/be-fsstubs.h index 8775939f410..39e4e751b45 100644 --- a/src/include/libpq/be-fsstubs.h +++ b/src/include/libpq/be-fsstubs.h @@ -21,6 +21,7 @@ */ extern int lo_read(int fd, char *buf, int len); extern int lo_write(int fd, const char *buf, int len); +extern void lo_put(Oid loOid, int64 offset, const char *str, int len); /* * Cleanup LOs at xact commit/abort -- 2.53.0.rc1.225.gd81095ad13-goog
From a9971b7d0c0e49d3b34e260521768bb8c6fe7ee4 Mon Sep 17 00:00:00 2001 From: Dilip Kumar <[email protected]> Date: Mon, 29 Dec 2025 09:37:52 +0000 Subject: [PATCH v3 1/4] Support large object decoding Introduce support for decoding changes to large objects in logical replication. Changes to 'pg_largeobject' are now intercepted in 'heap_decode' based on LargeObjectRelationId. Since a single large object operation (LO_WRITE) spans multiple physical rows in 'pg_largeobject', the changes are decoded and converted into a dedicated logical operation: REORDER_BUFFER_CHANGE_LOWRITE. --- contrib/test_decoding/Makefile | 2 +- .../expected/decoding_largeobject.out | 216 ++++++++++++++++++ .../sql/decoding_largeobject.sql | 94 ++++++++ contrib/test_decoding/test_decoding.c | 39 ++++ src/backend/replication/logical/decode.c | 133 +++++++++++ src/backend/replication/logical/proto.c | 41 ++++ .../replication/logical/reorderbuffer.c | 61 +++++ src/backend/replication/pgoutput/pgoutput.c | 23 ++ src/include/replication/logicalproto.h | 5 + src/include/replication/reorderbuffer.h | 12 + src/include/utils/rel.h | 6 +- 11 files changed, 630 insertions(+), 2 deletions(-) create mode 100644 contrib/test_decoding/expected/decoding_largeobject.out create mode 100644 contrib/test_decoding/sql/decoding_largeobject.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index acbcaed2feb..d1f02500cf3 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats twophase twophase_stream + spill slot truncate stream stats twophase twophase_stream decoding_largeobject ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ diff --git a/contrib/test_decoding/expected/decoding_largeobject.out b/contrib/test_decoding/expected/decoding_largeobject.out new file mode 100644 index 00000000000..a2720d82064 --- /dev/null +++ b/contrib/test_decoding/expected/decoding_largeobject.out @@ -0,0 +1,216 @@ +-- test that we can insert into the large objects and decode the changes +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- Create a new large object +CREATE TABLE lotest_stash_values (loid oid, fd integer); +INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42); +-- NOTE: large objects require transactions +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values; + lowrite +--------- + 22 +(1 row) + +SELECT lo_close(fd) FROM lotest_stash_values; + lo_close +---------- + 0 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: INSERT: loid[oid]:16970 fd[integer]:null + COMMIT + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 0 datalen: 22 data: large object test data + COMMIT +(7 rows) + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values; + lo_lseek +---------- + 10 +(1 row) + +SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values; + lowrite +--------- + 19 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 0 datalen: 29 data: large objeoverwrite some data + COMMIT +(4 rows) + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values; + lo_lseek +---------- + 2048 +(1 row) + +SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values; + lowrite +--------- + 19 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 2048 datalen: 19 data: write into new page + COMMIT +(4 rows) + +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, ' +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +') FROM lotest_stash_values; + lowrite +--------- + 3829 +(1 row) + +SELECT lo_close(fd) FROM lotest_stash_values; + lo_close +---------- + 0 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 0 datalen: 2048 data: + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data m + LO_WRITE: loid: 16970 offset: 2048 datalen: 1781 data: ore in 2048 test large data more in 2048+ + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + + COMMIT +(5 rows) + +-- Clean up the slot +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/decoding_largeobject.sql b/contrib/test_decoding/sql/decoding_largeobject.sql new file mode 100644 index 00000000000..ff392de2fba --- /dev/null +++ b/contrib/test_decoding/sql/decoding_largeobject.sql @@ -0,0 +1,94 @@ +-- test that we can insert into the large objects and decode the changes + +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Create a new large object +CREATE TABLE lotest_stash_values (loid oid, fd integer); + +INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42); + +-- NOTE: large objects require transactions +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values; +SELECT lo_close(fd) FROM lotest_stash_values; +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values; +SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values; +END; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values; +SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values; +END; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, ' +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +') FROM lotest_stash_values; +SELECT lo_close(fd) FROM lotest_stash_values; +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + + +-- Clean up the slot +SELECT pg_drop_replication_slot('regression_slot'); \ No newline at end of file diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index d5cf0fa02b0..be3f9c860dd 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -470,6 +470,38 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static void +pg_decode_lo_write(LogicalDecodingContext *ctx, + ReorderBufferChange *change) +{ + TestDecodingData *data; + MemoryContext old; + Oid loid = change->data.lo_write.loid; + int64 offset = change->data.lo_write.offset; + Size datalen = change->data.lo_write.datalen; + char *lodata = change->data.lo_write.data; + + data = ctx->output_plugin_private; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "LO_WRITE:"); + appendStringInfo(ctx->out, " loid: %u offset: " INT64_FORMAT " datalen: %zu data: ", + loid, offset, datalen); + + appendBinaryStringInfo(ctx->out, lodata, datalen); + + /* For test_decoding, we print the data length but typically skip the binary data itself */ + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + OutputPluginWrite(ctx, true); +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. @@ -619,6 +651,13 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = true; + /* Handle large object changes independent of the table changes. */ + if (change->action == REORDER_BUFFER_CHANGE_LOWRITE) + { + pg_decode_lo_write(ctx, change); + return; + } + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 32af1249610..2af7fb5fc88 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -26,6 +26,7 @@ */ #include "postgres.h" +#include "access/detoast.h" #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/xact.h" @@ -33,11 +34,13 @@ #include "access/xlogreader.h" #include "access/xlogrecord.h" #include "catalog/pg_control.h" +#include "catalog/pg_largeobject.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" +#include "storage/large_object.h" #include "storage/standbydefs.h" /* individual record(group)'s handlers */ @@ -56,6 +59,10 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool two_phase); static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_prepare *parsed); +static void DecodeLargeObjectInsert(LogicalDecodingContext *ctx, + XLogRecordBuffer *buf); +static void DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx, + XLogRecordBuffer *buf); /* common function to decode tuples */ @@ -466,6 +473,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + RelFileLocator target_locator; + XLogReaderState *r = buf->record; ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); @@ -480,6 +489,22 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; + XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + + /* + * Check if the WAL record pertains to 'pg_largeobject'. If it does, + * handle the large object changes separately via + * DecodeLargeObjectChanges, bypassing the standard heap table decoding + * logic that follows. + */ + if (target_locator.relNumber == LargeObjectRelationId) + { + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) + DecodeLargeObjectChanges(info, ctx, buf); + return; + } + switch (info) { case XLOG_HEAP_INSERT: @@ -1309,3 +1334,111 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, return false; } + +/* + * Helper function to decode a 'pg_largeobject' INSERT record into a + * 'REORDER_BUFFER_CHANGE_LOWRITE' change. + * + * Each row in 'pg_largeobject' represents only a small page (or chunk) of + * a large object's data. Logically, these individual page-level inserts + * are not meaningful on their own to a consumer. Therefore, instead of + * treating them as regular heap tuple changes, we convert the physical + * page insert into a single, more meaningful logical operation: a + * 'LO_WRITE' change, which can be applied as an independent large object + * operation. + */ +static void +DecodeLargeObjectInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + ReorderBufferChange *change; + Size datalen; + char *tupledata; + HeapTuple tuple; + bytea *data_chunk; + Oid loid; + int32 pageno; + int64 offset; + Size chunk_datalen; + char *data_copy; + bool freeit = false; + Form_pg_largeobject largeobject; + + tupledata = XLogRecGetBlockData(r, 0, &datalen); + if (datalen == 0) + return; + + tuple = ReorderBufferAllocTupleBuf(ctx->reorder, datalen - SizeOfHeapHeader); + DecodeXLogTuple(tupledata, datalen, tuple); + largeobject = GETSTRUCT(tuple); + + /* Fetch loid, pageno and actual data from the pg_largeobject tuple. */ + loid = largeobject->loid; + pageno = largeobject->pageno; + data_chunk = &(largeobject->data); + if (VARATT_IS_EXTENDED(data_chunk)) + { + data_chunk = (bytea *) + detoast_attr((struct varlena *) data_chunk); + freeit = true; + } + chunk_datalen = VARSIZE(data_chunk) - VARHDRSZ; + + /* + * Convert the single 'pg_largeobject' row (which represents a data page) + * into a logical 'LOWRITE' operation. The absolute offset for this write + * is computed by multiplying the page number ('pageno') by the fixed + * large object block size (LOBLKSIZE). + */ + offset = (int64) pageno * LOBLKSIZE; + //chunk_datalen = VARSIZE_ANY_EXHDR(data_chunk); + data_copy = ReorderBufferAllocRawBuffer(ctx->reorder, chunk_datalen); + memcpy(data_copy, VARDATA(data_chunk), chunk_datalen); + + + /* Create the LOWRITE change */ + change = ReorderBufferAllocChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_LOWRITE; + change->origin_id = XLogRecGetOrigin(r); + + change->data.lo_write.loid = loid; + change->data.lo_write.offset = offset; + change->data.lo_write.datalen = chunk_datalen; + change->data.lo_write.data = data_copy; + + /* Enqueue the change */ + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); + ReorderBufferFreeTupleBuf(tuple); + if (freeit) + pfree(data_chunk); +} + +/* + * Processes and decodes all logical changes for large objects (LOs). + * Since LO data is spread across 'pg_largeobject' rows, this function + * maps physical changes (INSERT/UPDATE) to a single logical 'LO_WRITE' + * operation. + * + * TODO: Temporarily ignoring LO_UNLINK (DELETE), which will be + * handled during a later phase. + */ +static void +DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx, + XLogRecordBuffer *buf) +{ + switch (info) + { + case XLOG_HEAP_INSERT: + case XLOG_HEAP_HOT_UPDATE: + case XLOG_HEAP_UPDATE: + DecodeLargeObjectInsert(ctx, buf); + break; + case XLOG_HEAP_DELETE: + /* LO_UNLINK (delete) is handled in a later phase */ + break; + default: + /* Ignore other operations on pg_largeobject for now */ + break; + } +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3950dd0cf46..bfc8678e3df 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -346,6 +346,45 @@ logicalrep_read_rollback_prepared(StringInfo in, strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid)); } +void +logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid, + int64 offset, Size datalen, const char *data) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_LOWRITE); + + /* Write LO ID, offset, and data length */ + pq_sendint32(out, loid); + pq_sendint64(out, offset); + pq_sendint32(out, datalen); + + /* Write the data chunk */ + pq_sendbytes(out, data, datalen); +} + +void +logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen, + char **data) +{ + /* Read fields, incorporating validation */ + *loid = pq_getmsgint(s, 4); + if (!OidIsValid(*loid)) + elog(ERROR, "large object ID is not set in LO write message"); + + *offset = pq_getmsgint64(s); + if (*offset < 0) + elog(ERROR, "invalid offset " INT64_FORMAT " in LO write message", *offset); + + *datalen = pq_getmsgint(s, 4); + if (*datalen < 0) + elog(ERROR, "invalid data length %zu in LO write message", *datalen); + + /* Allocate memory for the data payload */ + *data = palloc(*datalen); + + /* Read the data payload directly into the new buffer */ + pq_copymsgbytes(s, *data, *datalen); +} + /* * Write STREAM PREPARE to the output stream. */ @@ -1235,6 +1274,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "TYPE"; case LOGICAL_REP_MSG_MESSAGE: return "MESSAGE"; + case LOGICAL_REP_MSG_LOWRITE: + return "LOWRITE"; case LOGICAL_REP_MSG_BEGIN_PREPARE: return "BEGIN PREPARE"; case LOGICAL_REP_MSG_PREPARE: diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 2d2a6d5e9e7..a6d672ab19c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -579,6 +579,13 @@ ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_LOWRITE: + if (change->data.lo_write.data != NULL) + { + pfree(change->data.lo_write.data); + change->data.lo_write.data = NULL; + } + break; } pfree(change); @@ -930,6 +937,19 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } +/* + * Allocate a raw memory from reorder buffer. + */ +void * +ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len) +{ + void *buffer; + + buffer = (char *) MemoryContextAlloc(rb->tup_context, alloc_len); + + return buffer; +} + /* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer @@ -2585,6 +2605,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; + + case REORDER_BUFFER_CHANGE_LOWRITE: + ReorderBufferApplyChange(rb, txn, NULL, change, streaming); + break; } /* @@ -4267,6 +4291,26 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ break; + case REORDER_BUFFER_CHANGE_LOWRITE: + { + char *data; + Size datalen = change->data.lo_write.datalen; + + sz += datalen; + + /* make sure we have enough space */ + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + /* Copy the LO_WRITE struct and the data payload immediately following it */ + memcpy(data, &change->data.lo_write.data, datalen); + + break; + } } ondisk->size = sz; @@ -4531,6 +4575,11 @@ ReorderBufferChangeSize(ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ break; + case REORDER_BUFFER_CHANGE_LOWRITE: + { + sz += change->data.lo_write.datalen; + break; + } } return sz; @@ -4830,6 +4879,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_LOWRITE: + { + Size datalen = change->data.lo_write.datalen; + + /* Allocate memory for the data payload */ + change->data.lo_write.data = MemoryContextAlloc(rb->context, datalen); + + /* Copy the data payload */ + memcpy(change->data.lo_write.data, data, datalen); + + break; + } } dlist_push_tail(&txn->changes, &change->node); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e016f64e0b3..5eb5a0fa86d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1495,6 +1495,29 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; + if (change->action == REORDER_BUFFER_CHANGE_LOWRITE) + { + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + + /* Remember the xid for the change in streaming mode. */ + if (data->in_streaming) + xid = change->txn->xid; + + OutputPluginPrepareWrite(ctx, true); + + /* Use the new helper to serialize the LO payload */ + logicalrep_write_lo_write(ctx->out, xid, + change->data.lo_write.loid, + change->data.lo_write.offset, + change->data.lo_write.datalen, + change->data.lo_write.data); + + OutputPluginWrite(ctx, true); + + return; + } + if (!is_publishable_relation(relation)) return; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 058a955e20c..ba65ee406be 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -66,6 +66,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_LOWRITE = 'W', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', @@ -214,6 +215,10 @@ extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN TimestampTz prepare_time); extern void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data); +extern void logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid, + int64 offset, Size datalen, const char *data); +extern void logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen, + char **data); extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); extern void logicalrep_read_stream_prepare(StringInfo in, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 2d717a9e152..eb9b150e05f 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -61,6 +61,7 @@ typedef enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_LOWRITE, } ReorderBufferChangeType; /* forward declaration */ @@ -154,6 +155,16 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; + + /* Lo write */ + struct + { + Oid loid; + int64 offset; + Size datalen; + char *data; + } lo_write; + } data; /* @@ -722,6 +733,7 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +extern void *ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len); extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, ReplOriginId origin_id, XLogRecPtr origin_lsn); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 236830f6b93..ff68c38f75c 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -19,6 +19,7 @@ #include "catalog/catalog.h" #include "catalog/pg_class.h" #include "catalog/pg_index.h" +#include "catalog/pg_largeobject.h" #include "catalog/pg_publication.h" #include "nodes/bitmapset.h" #include "partitioning/partdefs.h" @@ -706,12 +707,15 @@ RelationCloseSmgr(Relation relation) * it would complicate decoding slightly for little gain). Note that we *do* * log information for user defined catalog tables since they presumably are * interesting to the user... + * + * TODO: Logically log pg_largeobject rows with a configuration parameter + * instead of doing it unconditionally. */ #define RelationIsLogicallyLogged(relation) \ (XLogLogicalInfoActive() && \ RelationNeedsWAL(relation) && \ (relation)->rd_rel->relkind != RELKIND_FOREIGN_TABLE && \ - !IsCatalogRelation(relation)) + !(IsCatalogRelation(relation) && RelationGetRelid(relation) != LargeObjectRelationId)) /* routines in utils/cache/relcache.c */ extern void RelationIncrementReferenceCount(Relation rel); -- 2.53.0.rc1.225.gd81095ad13-goog
From 413d02e042692999b0239c8039a95811491ceb77 Mon Sep 17 00:00:00 2001 From: Nitin Motiani <[email protected]> Date: Mon, 2 Feb 2026 09:05:51 +0000 Subject: [PATCH v3 4/4] Enable tablesync for large objects for all users * Instead of trying to copy pg_largeobject entries, we utilize lo_from_bytea to write the entries. * The use of lo_from_bytea allows us to create the large object if it doesn't already exist. This way we can do the tablesync just using one worker instead of running a separate worker for pg_largeobject_metadata. Running one worker helps us avoid any synchronization issues which would come up with a separate sync worker creating the large objects and a separate one writing to them. * This also allows the subscription owner to only replicate the large objects its counterpart owns on the publisher. This way multiple subscriptions can be created which only take care of only their own large objects. * The above logic can also be added to the apply side of the code to only replicate the large objects owned by the subscription owner. --- src/backend/libpq/be-fsstubs.c | 24 +++++++---- src/backend/replication/logical/tablesync.c | 47 ++++++++++++++++++++- src/include/libpq/be-fsstubs.h | 1 + 3 files changed, 62 insertions(+), 10 deletions(-) diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c index ffabad9b708..9d3fbeb809e 100644 --- a/src/backend/libpq/be-fsstubs.c +++ b/src/backend/libpq/be-fsstubs.c @@ -216,6 +216,20 @@ lo_put(Oid loOid, int64 offset, const char *str, int len) inv_close(loDesc); } +void +lo_from_bytea(Oid loOid, bytea *str) +{ + LargeObjectDesc *loDesc; + int written PG_USED_FOR_ASSERTS_ONLY; + + lo_cleanup_needed = true; + loOid = inv_create(loOid); + loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext); + written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str)); + Assert(written == VARSIZE_ANY_EXHDR(str)); + inv_close(loDesc); +} + Datum be_lo_lseek(PG_FUNCTION_ARGS) { @@ -847,18 +861,10 @@ be_lo_from_bytea(PG_FUNCTION_ARGS) { Oid loOid = PG_GETARG_OID(0); bytea *str = PG_GETARG_BYTEA_PP(1); - LargeObjectDesc *loDesc; - int written PG_USED_FOR_ASSERTS_ONLY; PreventCommandIfReadOnly("lo_from_bytea()"); - lo_cleanup_needed = true; - loOid = inv_create(loOid); - loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext); - written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str)); - Assert(written == VARSIZE_ANY_EXHDR(str)); - inv_close(loDesc); - + lo_from_bytea(loOid, str); PG_RETURN_OID(loOid); } diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 19a3c21a863..62ea0ee416c 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -100,6 +100,7 @@ #include "catalog/pg_subscription_rel.h" #include "catalog/pg_type.h" #include "commands/copy.h" +#include "libpq/be-fsstubs.h" #include "miscadmin.h" #include "nodes/makefuncs.h" #include "parser/parse_relation.h" @@ -1406,6 +1407,51 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) replorigin_session_setup(originid, 0); replorigin_xact_state.origin = originid; + if (rel->rd_id == LargeObjectRelationId) + { + TupleTableSlot *slot; + Oid tableRow[2] = {OIDOID, BYTEAOID}; + + PushActiveSnapshot(GetTransactionSnapshot()); + res = walrcv_exec(LogRepWorkerWalRcvConn, "SELECT m.oid as oid, lo_get(m.oid) FROM pg_largeobject_metadata as m JOIN pg_user as u ON m.lomowner = u.usesysid where u.usename=CURRENT_USER", 2, tableRow); + if (res->status != WALRCV_OK_TUPLES) + ereport(ERROR, + errmsg("could not run large objects command.")); + slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) + { + Oid looid; + bytea *lodata; + bool isnull; + + looid = DatumGetObjectId(slot_getattr(slot, 1, &isnull)); + Assert(!isnull); + lodata = (bytea *) (slot_getattr(slot, 2, &isnull)); + lo_from_bytea(looid, lodata); + ExecClearTuple(slot); + } + ExecDropSingleTupleTableSlot(slot); + walrcv_clear_result(res); + PopActiveSnapshot(); + + res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL); + if (res->status != WALRCV_OK_COMMAND) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("table copy could not finish transaction on publisher: %s", + res->err))); + + walrcv_clear_result(res); + table_close(rel, NoLock); + CommandCounterIncrement(); + UpdateSubscriptionRelState(MyLogicalRepWorker->subid, + MyLogicalRepWorker->relid, + SUBREL_STATE_FINISHEDCOPY, + MyLogicalRepWorker->relstate_lsn, + false); + CommitTransactionCommand(); + goto copy_table_done; + } /* * If the user did not opt to run as the owner of the subscription @@ -1474,7 +1520,6 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CommitTransactionCommand(); copy_table_done: - elog(DEBUG1, "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X", originname, LSN_FORMAT_ARGS(*origin_startpos)); diff --git a/src/include/libpq/be-fsstubs.h b/src/include/libpq/be-fsstubs.h index 39e4e751b45..11c1e29efbc 100644 --- a/src/include/libpq/be-fsstubs.h +++ b/src/include/libpq/be-fsstubs.h @@ -22,6 +22,7 @@ extern int lo_read(int fd, char *buf, int len); extern int lo_write(int fd, const char *buf, int len); extern void lo_put(Oid loOid, int64 offset, const char *str, int len); +extern void lo_from_bytea(Oid loOid, bytea *str); /* * Cleanup LOs at xact commit/abort -- 2.53.0.rc1.225.gd81095ad13-goog
