Hi Dilip, Thanks for initiating this work. I have created a POC for the apply worker on top of your patch. I refactored lo_put to provide a function which can be called without an fmgr and used that to apply the lo_write operation coming from the publisher. I tested this manually along with your patch and it works as long as lo_create is called beforehand on the subscriber with the same oid. This patch currently doesn't handle the tablesync worker so it can only apply the lo_write ops done after the replication slot is created. I'm looking into how to support large objects in tablesync. PFA the patch and let me know what you think.
Thanks, Nitin Motiani Google
From 2848e8ae6692d991014a442fd9faac200a5bef96 Mon Sep 17 00:00:00 2001 From: Nitin Motiani <[email protected]> Date: Wed, 31 Dec 2025 09:09:20 +0000 Subject: [PATCH v2 2/2] Add support to apply lo_write operations in applyworker * Handle REORDER_BUFFER_CHANGE_LOWRITE operations on apply worker. * This is done by using lo_put. This patch refactors lo_put to provide a version which is not fmgr-callable but is available to the C code. * This patch currently does not handle large objects in tablesync. --- src/backend/libpq/be-fsstubs.c | 24 +++++++++++++++--------- src/backend/replication/logical/worker.c | 21 +++++++++++++++++++++ src/include/libpq/be-fsstubs.h | 1 + 3 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/backend/libpq/be-fsstubs.c b/src/backend/libpq/be-fsstubs.c index f27e374c4ee..ffabad9b708 100644 --- a/src/backend/libpq/be-fsstubs.c +++ b/src/backend/libpq/be-fsstubs.c @@ -202,6 +202,20 @@ lo_write(int fd, const char *buf, int len) return status; } +void +lo_put(Oid loOid, int64 offset, const char *str, int len) +{ + LargeObjectDesc *loDesc; + int written PG_USED_FOR_ASSERTS_ONLY; + + lo_cleanup_needed = true; + loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext); + inv_seek(loDesc, offset, SEEK_SET); + written = inv_write(loDesc, str, len); + Assert(written == len); + inv_close(loDesc); +} + Datum be_lo_lseek(PG_FUNCTION_ARGS) { @@ -857,17 +871,9 @@ be_lo_put(PG_FUNCTION_ARGS) Oid loOid = PG_GETARG_OID(0); int64 offset = PG_GETARG_INT64(1); bytea *str = PG_GETARG_BYTEA_PP(2); - LargeObjectDesc *loDesc; - int written PG_USED_FOR_ASSERTS_ONLY; PreventCommandIfReadOnly("lo_put()"); - lo_cleanup_needed = true; - loDesc = inv_open(loOid, INV_WRITE, CurrentMemoryContext); - inv_seek(loDesc, offset, SEEK_SET); - written = inv_write(loDesc, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str)); - Assert(written == VARSIZE_ANY_EXHDR(str)); - inv_close(loDesc); - + lo_put(loOid, offset, VARDATA_ANY(str), VARSIZE_ANY_EXHDR(str)); PG_RETURN_VOID(); } diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ad281e7069b..4863e7cbc91 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -261,6 +261,7 @@ #include "commands/trigger.h" #include "executor/executor.h" #include "executor/execPartition.h" +#include "libpq/be-fsstubs.h" #include "libpq/pqformat.h" #include "miscadmin.h" #include "optimizer/optimizer.h" @@ -3163,6 +3164,23 @@ apply_handle_delete_internal(ApplyExecutionData *edata, EvalPlanQualEnd(&epqstate); } +static void +apply_handle_lowrite(StringInfo s) +{ + Oid loid; + int64 offset; + Size datalen; + char *data; + + if (handle_streamed_transaction(LOGICAL_REP_MSG_LOWRITE, s)) + return; + + begin_replication_step(); + logicalrep_read_lo_write(s, &loid, &offset, &datalen, &data); + lo_put(loid, offset, data, datalen); + end_replication_step(); +} + /* * Try to find a tuple received from the publication side (in 'remoteslot') in * the corresponding local relation using either replica identity index, @@ -3868,6 +3886,9 @@ apply_dispatch(StringInfo s) apply_handle_stream_prepare(s); break; + case LOGICAL_REP_MSG_LOWRITE: + apply_handle_lowrite(s); + break; default: ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), diff --git a/src/include/libpq/be-fsstubs.h b/src/include/libpq/be-fsstubs.h index 8775939f410..39e4e751b45 100644 --- a/src/include/libpq/be-fsstubs.h +++ b/src/include/libpq/be-fsstubs.h @@ -21,6 +21,7 @@ */ extern int lo_read(int fd, char *buf, int len); extern int lo_write(int fd, const char *buf, int len); +extern void lo_put(Oid loOid, int64 offset, const char *str, int len); /* * Cleanup LOs at xact commit/abort -- 2.52.0.351.gbe84eed79e-goog
From 30c343c440af07807015aff94dbe465aaf48564a Mon Sep 17 00:00:00 2001 From: Dilip Kumar <[email protected]> Date: Mon, 29 Dec 2025 09:37:52 +0000 Subject: [PATCH v2 1/2] Support large object decoding Introduce support for decoding changes to large objects in logical replication. Changes to 'pg_largeobject' are now intercepted in 'heap_decode' based on LargeObjectRelationId. Since a single large object operation (LO_WRITE) spans multiple physical rows in 'pg_largeobject', the changes are decoded and converted into a dedicated logical operation: REORDER_BUFFER_CHANGE_LOWRITE. --- contrib/test_decoding/Makefile | 2 +- .../expected/decoding_largeobject.out | 216 ++++++++++++++++++ .../sql/decoding_largeobject.sql | 94 ++++++++ contrib/test_decoding/test_decoding.c | 39 ++++ src/backend/replication/logical/decode.c | 133 +++++++++++ src/backend/replication/logical/proto.c | 41 ++++ .../replication/logical/reorderbuffer.c | 61 +++++ src/backend/replication/pgoutput/pgoutput.c | 23 ++ src/include/replication/logicalproto.h | 5 + src/include/replication/reorderbuffer.h | 12 + src/include/utils/rel.h | 6 +- 11 files changed, 630 insertions(+), 2 deletions(-) create mode 100644 contrib/test_decoding/expected/decoding_largeobject.out create mode 100644 contrib/test_decoding/sql/decoding_largeobject.sql diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index acbcaed2feb..d1f02500cf3 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin" REGRESS = ddl xact rewrite toast permissions decoding_in_xact \ decoding_into_rel binary prepared replorigin time messages \ - spill slot truncate stream stats twophase twophase_stream + spill slot truncate stream stats twophase twophase_stream decoding_largeobject ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \ oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \ twophase_snapshot slot_creation_error catalog_change_snapshot \ diff --git a/contrib/test_decoding/expected/decoding_largeobject.out b/contrib/test_decoding/expected/decoding_largeobject.out new file mode 100644 index 00000000000..a2720d82064 --- /dev/null +++ b/contrib/test_decoding/expected/decoding_largeobject.out @@ -0,0 +1,216 @@ +-- test that we can insert into the large objects and decode the changes +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------ +(0 rows) + +-- Create a new large object +CREATE TABLE lotest_stash_values (loid oid, fd integer); +INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42); +-- NOTE: large objects require transactions +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values; + lowrite +--------- + 22 +(1 row) + +SELECT lo_close(fd) FROM lotest_stash_values; + lo_close +---------- + 0 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: INSERT: loid[oid]:16970 fd[integer]:null + COMMIT + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 0 datalen: 22 data: large object test data + COMMIT +(7 rows) + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values; + lo_lseek +---------- + 10 +(1 row) + +SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values; + lowrite +--------- + 19 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 0 datalen: 29 data: large objeoverwrite some data + COMMIT +(4 rows) + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values; + lo_lseek +---------- + 2048 +(1 row) + +SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values; + lowrite +--------- + 19 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +-------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 2048 datalen: 19 data: write into new page + COMMIT +(4 rows) + +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, ' +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +') FROM lotest_stash_values; + lowrite +--------- + 3829 +(1 row) + +SELECT lo_close(fd) FROM lotest_stash_values; + lo_close +---------- + 0 +(1 row) + +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +------------------------------------------------------------------------------------------------- + BEGIN + table public.lotest_stash_values: UPDATE: loid[oid]:16970 fd[integer]:0 + LO_WRITE: loid: 16970 offset: 0 datalen: 2048 data: + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data m + LO_WRITE: loid: 16970 offset: 2048 datalen: 1781 data: ore in 2048 test large data more in 2048+ + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + test large data more in 2048 test large data more in 2048 test large data more in 2048 + + + COMMIT +(5 rows) + +-- Clean up the slot +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/decoding_largeobject.sql b/contrib/test_decoding/sql/decoding_largeobject.sql new file mode 100644 index 00000000000..ff392de2fba --- /dev/null +++ b/contrib/test_decoding/sql/decoding_largeobject.sql @@ -0,0 +1,94 @@ +-- test that we can insert into the large objects and decode the changes + +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- slot works +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- Create a new large object +CREATE TABLE lotest_stash_values (loid oid, fd integer); + +INSERT INTO lotest_stash_values (loid) SELECT lo_creat(42); + +-- NOTE: large objects require transactions +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, 'large object test data') FROM lotest_stash_values; +SELECT lo_close(fd) FROM lotest_stash_values; +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 10, 0) FROM lotest_stash_values; +SELECT lowrite(fd, 'overwrite some data') FROM lotest_stash_values; +END; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +BEGIN; +UPDATE lotest_stash_values SET fd=lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lo_lseek(fd, 2048, 0) FROM lotest_stash_values; +SELECT lowrite(fd, 'write into new page') FROM lotest_stash_values; +END; + +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +BEGIN; +UPDATE lotest_stash_values SET fd = lo_open(loid, CAST(x'20000' | x'40000' AS integer)); +SELECT lowrite(fd, ' +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +test large data more in 2048 test large data more in 2048 test large data more in 2048 +') FROM lotest_stash_values; +SELECT lo_close(fd) FROM lotest_stash_values; +END; +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + + +-- Clean up the slot +SELECT pg_drop_replication_slot('regression_slot'); \ No newline at end of file diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index e104aa5aa6e..cf153c7661b 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -470,6 +470,38 @@ pg_decode_filter(LogicalDecodingContext *ctx, return false; } +static void +pg_decode_lo_write(LogicalDecodingContext *ctx, + ReorderBufferChange *change) +{ + TestDecodingData *data; + MemoryContext old; + Oid loid = change->data.lo_write.loid; + int64 offset = change->data.lo_write.offset; + Size datalen = change->data.lo_write.datalen; + char *lodata = change->data.lo_write.data; + + data = ctx->output_plugin_private; + + /* Avoid leaking memory by using and resetting our own context */ + old = MemoryContextSwitchTo(data->context); + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "LO_WRITE:"); + appendStringInfo(ctx->out, " loid: %u offset: " INT64_FORMAT " datalen: %zu data: ", + loid, offset, datalen); + + appendBinaryStringInfo(ctx->out, lodata, datalen); + + /* For test_decoding, we print the data length but typically skip the binary data itself */ + + MemoryContextSwitchTo(old); + MemoryContextReset(data->context); + + OutputPluginWrite(ctx, true); +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. @@ -619,6 +651,13 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } txndata->xact_wrote_changes = true; + /* Handle large object changes independent of the table changes. */ + if (change->action == REORDER_BUFFER_CHANGE_LOWRITE) + { + pg_decode_lo_write(ctx, change); + return; + } + class_form = RelationGetForm(relation); tupdesc = RelationGetDescr(relation); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index e25dd6bc366..a60f66cb68e 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -26,6 +26,7 @@ */ #include "postgres.h" +#include "access/detoast.h" #include "access/heapam_xlog.h" #include "access/transam.h" #include "access/xact.h" @@ -33,11 +34,13 @@ #include "access/xlogreader.h" #include "access/xlogrecord.h" #include "catalog/pg_control.h" +#include "catalog/pg_largeobject.h" #include "replication/decode.h" #include "replication/logical.h" #include "replication/message.h" #include "replication/reorderbuffer.h" #include "replication/snapbuild.h" +#include "storage/large_object.h" #include "storage/standbydefs.h" /* individual record(group)'s handlers */ @@ -56,6 +59,10 @@ static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, bool two_phase); static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_prepare *parsed); +static void DecodeLargeObjectInsert(LogicalDecodingContext *ctx, + XLogRecordBuffer *buf); +static void DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx, + XLogRecordBuffer *buf); /* common function to decode tuples */ @@ -466,6 +473,8 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + RelFileLocator target_locator; + XLogReaderState *r = buf->record; ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); @@ -480,6 +489,22 @@ heap_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; + XLogRecGetBlockTag(r, 0, &target_locator, NULL, NULL); + + /* + * Check if the WAL record pertains to 'pg_largeobject'. If it does, + * handle the large object changes separately via + * DecodeLargeObjectChanges, bypassing the standard heap table decoding + * logic that follows. + */ + if (target_locator.relNumber == LargeObjectRelationId) + { + if (SnapBuildProcessChange(builder, xid, buf->origptr) && + !ctx->fast_forward) + DecodeLargeObjectChanges(info, ctx, buf); + return; + } + switch (info) { case XLOG_HEAP_INSERT: @@ -1309,3 +1334,111 @@ DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, return false; } + +/* + * Helper function to decode a 'pg_largeobject' INSERT record into a + * 'REORDER_BUFFER_CHANGE_LOWRITE' change. + * + * Each row in 'pg_largeobject' represents only a small page (or chunk) of + * a large object's data. Logically, these individual page-level inserts + * are not meaningful on their own to a consumer. Therefore, instead of + * treating them as regular heap tuple changes, we convert the physical + * page insert into a single, more meaningful logical operation: a + * 'LO_WRITE' change, which can be applied as an independent large object + * operation. + */ +static void +DecodeLargeObjectInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + ReorderBufferChange *change; + Size datalen; + char *tupledata; + HeapTuple tuple; + bytea *data_chunk; + Oid loid; + int32 pageno; + int64 offset; + Size chunk_datalen; + char *data_copy; + bool freeit = false; + Form_pg_largeobject largeobject; + + tupledata = XLogRecGetBlockData(r, 0, &datalen); + if (datalen == 0) + return; + + tuple = ReorderBufferAllocTupleBuf(ctx->reorder, datalen - SizeOfHeapHeader); + DecodeXLogTuple(tupledata, datalen, tuple); + largeobject = GETSTRUCT(tuple); + + /* Fetch loid, pageno and actual data from the pg_largeobject tuple. */ + loid = largeobject->loid; + pageno = largeobject->pageno; + data_chunk = &(largeobject->data); + if (VARATT_IS_EXTENDED(data_chunk)) + { + data_chunk = (bytea *) + detoast_attr((struct varlena *) data_chunk); + freeit = true; + } + chunk_datalen = VARSIZE(data_chunk) - VARHDRSZ; + + /* + * Convert the single 'pg_largeobject' row (which represents a data page) + * into a logical 'LOWRITE' operation. The absolute offset for this write + * is computed by multiplying the page number ('pageno') by the fixed + * large object block size (LOBLKSIZE). + */ + offset = (int64) pageno * LOBLKSIZE; + //chunk_datalen = VARSIZE_ANY_EXHDR(data_chunk); + data_copy = ReorderBufferAllocRawBuffer(ctx->reorder, chunk_datalen); + memcpy(data_copy, VARDATA(data_chunk), chunk_datalen); + + + /* Create the LOWRITE change */ + change = ReorderBufferAllocChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_LOWRITE; + change->origin_id = XLogRecGetOrigin(r); + + change->data.lo_write.loid = loid; + change->data.lo_write.offset = offset; + change->data.lo_write.datalen = chunk_datalen; + change->data.lo_write.data = data_copy; + + /* Enqueue the change */ + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, + change, false); + ReorderBufferFreeTupleBuf(tuple); + if (freeit) + pfree(data_chunk); +} + +/* + * Processes and decodes all logical changes for large objects (LOs). + * Since LO data is spread across 'pg_largeobject' rows, this function + * maps physical changes (INSERT/UPDATE) to a single logical 'LO_WRITE' + * operation. + * + * TODO: Temporarily ignoring LO_UNLINK (DELETE), which will be + * handled during a later phase. + */ +static void +DecodeLargeObjectChanges(uint8 info, LogicalDecodingContext *ctx, + XLogRecordBuffer *buf) +{ + switch (info) + { + case XLOG_HEAP_INSERT: + case XLOG_HEAP_HOT_UPDATE: + case XLOG_HEAP_UPDATE: + DecodeLargeObjectInsert(ctx, buf); + break; + case XLOG_HEAP_DELETE: + /* LO_UNLINK (delete) is handled in a later phase */ + break; + default: + /* Ignore other operations on pg_largeobject for now */ + break; + } +} diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 3950dd0cf46..bfc8678e3df 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -346,6 +346,45 @@ logicalrep_read_rollback_prepared(StringInfo in, strlcpy(rollback_data->gid, pq_getmsgstring(in), sizeof(rollback_data->gid)); } +void +logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid, + int64 offset, Size datalen, const char *data) +{ + pq_sendbyte(out, LOGICAL_REP_MSG_LOWRITE); + + /* Write LO ID, offset, and data length */ + pq_sendint32(out, loid); + pq_sendint64(out, offset); + pq_sendint32(out, datalen); + + /* Write the data chunk */ + pq_sendbytes(out, data, datalen); +} + +void +logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen, + char **data) +{ + /* Read fields, incorporating validation */ + *loid = pq_getmsgint(s, 4); + if (!OidIsValid(*loid)) + elog(ERROR, "large object ID is not set in LO write message"); + + *offset = pq_getmsgint64(s); + if (*offset < 0) + elog(ERROR, "invalid offset " INT64_FORMAT " in LO write message", *offset); + + *datalen = pq_getmsgint(s, 4); + if (*datalen < 0) + elog(ERROR, "invalid data length %zu in LO write message", *datalen); + + /* Allocate memory for the data payload */ + *data = palloc(*datalen); + + /* Read the data payload directly into the new buffer */ + pq_copymsgbytes(s, *data, *datalen); +} + /* * Write STREAM PREPARE to the output stream. */ @@ -1235,6 +1274,8 @@ logicalrep_message_type(LogicalRepMsgType action) return "TYPE"; case LOGICAL_REP_MSG_MESSAGE: return "MESSAGE"; + case LOGICAL_REP_MSG_LOWRITE: + return "LOWRITE"; case LOGICAL_REP_MSG_BEGIN_PREPARE: return "BEGIN PREPARE"; case LOGICAL_REP_MSG_PREPARE: diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index a0293f6ec7c..a4bad88f4ab 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -579,6 +579,13 @@ ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_LOWRITE: + if (change->data.lo_write.data != NULL) + { + pfree(change->data.lo_write.data); + change->data.lo_write.data = NULL; + } + break; } pfree(change); @@ -930,6 +937,19 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, } } +/* + * Allocate a raw memory from reorder buffer. + */ +void * +ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len) +{ + void *buffer; + + buffer = (char *) MemoryContextAlloc(rb->tup_context, alloc_len); + + return buffer; +} + /* * AssertTXNLsnOrder * Verify LSN ordering of transaction lists in the reorderbuffer @@ -2585,6 +2605,10 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: elog(ERROR, "tuplecid value in changequeue"); break; + + case REORDER_BUFFER_CHANGE_LOWRITE: + ReorderBufferApplyChange(rb, txn, NULL, change, streaming); + break; } /* @@ -4267,6 +4291,26 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ break; + case REORDER_BUFFER_CHANGE_LOWRITE: + { + char *data; + Size datalen = change->data.lo_write.datalen; + + sz += datalen; + + /* make sure we have enough space */ + ReorderBufferSerializeReserve(rb, sz); + + data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); + + /* might have been reallocated above */ + ondisk = (ReorderBufferDiskChange *) rb->outbuf; + + /* Copy the LO_WRITE struct and the data payload immediately following it */ + memcpy(data, &change->data.lo_write.data, datalen); + + break; + } } ondisk->size = sz; @@ -4531,6 +4575,11 @@ ReorderBufferChangeSize(ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: /* ReorderBufferChange contains everything important */ break; + case REORDER_BUFFER_CHANGE_LOWRITE: + { + sz += change->data.lo_write.datalen; + break; + } } return sz; @@ -4830,6 +4879,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: break; + case REORDER_BUFFER_CHANGE_LOWRITE: + { + Size datalen = change->data.lo_write.datalen; + + /* Allocate memory for the data payload */ + change->data.lo_write.data = MemoryContextAlloc(rb->context, datalen); + + /* Copy the data payload */ + memcpy(change->data.lo_write.data, data, datalen); + + break; + } } dlist_push_tail(&txn->changes, &change->node); diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 9ee8949e040..378863a0480 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -1495,6 +1495,29 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; + if (change->action == REORDER_BUFFER_CHANGE_LOWRITE) + { + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + + /* Remember the xid for the change in streaming mode. */ + if (data->in_streaming) + xid = change->txn->xid; + + OutputPluginPrepareWrite(ctx, true); + + /* Use the new helper to serialize the LO payload */ + logicalrep_write_lo_write(ctx->out, xid, + change->data.lo_write.loid, + change->data.lo_write.offset, + change->data.lo_write.datalen, + change->data.lo_write.data); + + OutputPluginWrite(ctx, true); + + return; + } + if (!is_publishable_relation(relation)) return; diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 058a955e20c..ba65ee406be 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -66,6 +66,7 @@ typedef enum LogicalRepMsgType LOGICAL_REP_MSG_RELATION = 'R', LOGICAL_REP_MSG_TYPE = 'Y', LOGICAL_REP_MSG_MESSAGE = 'M', + LOGICAL_REP_MSG_LOWRITE = 'W', LOGICAL_REP_MSG_BEGIN_PREPARE = 'b', LOGICAL_REP_MSG_PREPARE = 'P', LOGICAL_REP_MSG_COMMIT_PREPARED = 'K', @@ -214,6 +215,10 @@ extern void logicalrep_write_rollback_prepared(StringInfo out, ReorderBufferTXN TimestampTz prepare_time); extern void logicalrep_read_rollback_prepared(StringInfo in, LogicalRepRollbackPreparedTxnData *rollback_data); +extern void logicalrep_write_lo_write(StringInfo out, TransactionId xid, Oid loid, + int64 offset, Size datalen, const char *data); +extern void logicalrep_read_lo_write(StringInfo s, Oid *loid, int64 *offset, Size *datalen, + char **data); extern void logicalrep_write_stream_prepare(StringInfo out, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); extern void logicalrep_read_stream_prepare(StringInfo in, diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index 314e35592c0..444d553f63a 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -61,6 +61,7 @@ typedef enum ReorderBufferChangeType REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM, REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT, REORDER_BUFFER_CHANGE_TRUNCATE, + REORDER_BUFFER_CHANGE_LOWRITE, } ReorderBufferChangeType; /* forward declaration */ @@ -154,6 +155,16 @@ typedef struct ReorderBufferChange uint32 ninvalidations; /* Number of messages */ SharedInvalidationMessage *invalidations; /* invalidation message */ } inval; + + /* Lo write */ + struct + { + Oid loid; + int64 offset; + Size datalen; + char *data; + } lo_write; + } data; /* @@ -722,6 +733,7 @@ extern void ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, Snapshot snap, XLogRecPtr lsn, bool transactional, const char *prefix, Size message_size, const char *message); +extern void *ReorderBufferAllocRawBuffer(ReorderBuffer *rb, Size alloc_len); extern void ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, XLogRecPtr commit_lsn, XLogRecPtr end_lsn, TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index d03ab247788..2a4b77ba26c 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -19,6 +19,7 @@ #include "catalog/catalog.h" #include "catalog/pg_class.h" #include "catalog/pg_index.h" +#include "catalog/pg_largeobject.h" #include "catalog/pg_publication.h" #include "nodes/bitmapset.h" #include "partitioning/partdefs.h" @@ -707,12 +708,15 @@ RelationCloseSmgr(Relation relation) * it would complicate decoding slightly for little gain). Note that we *do* * log information for user defined catalog tables since they presumably are * interesting to the user... + * + * TODO: Logically log pg_largeobject rows with a configuration parameter + * instead of doing it unconditionally. */ #define RelationIsLogicallyLogged(relation) \ (XLogLogicalInfoActive() && \ RelationNeedsWAL(relation) && \ (relation)->rd_rel->relkind != RELKIND_FOREIGN_TABLE && \ - !IsCatalogRelation(relation)) + !(IsCatalogRelation(relation) && RelationGetRelid(relation) != LargeObjectRelationId)) /* routines in utils/cache/relcache.c */ extern void RelationIncrementReferenceCount(Relation rel); -- 2.52.0.351.gbe84eed79e-goog
