> Then why didn't you specified PARALLEL UNSAFE as well? You are correct, I missed marking the function as PARALLEL UNSAFE. I’ve attached a revised patch with the correct annotation.
> BTW, yesterday a new thread started with the same requirement [1]. It > uses a slightly different way to define the new function. do you have > any opinion on it? I don’t think introducing a separate function is a good idea. It’s effectively the same behavior, technical debt, and maintenance overhead without a clear benefit. Our patch keeps a single function with a default parameter, so it’s not a breaking change. So I believe our approach is preferable. But I would say that, the fact that another patch is proposing the same capability indicates there’s broader demand for this change.
From 74a74fd02bce786093c19a23bef9444d0b8ef41d Mon Sep 8 00:00:00 2025 From: Doruk <[email protected]> Date: Mon, 8 Sep 2025 14:22:15 +0300 Subject: [PATCH v6] pg_replication_origin_session_setup: pid parameter Since the introduction of parallel apply workers (commit 216a784829c), the replorigin_session_setup() was extended to accept an extra parameter: pid. This process ID is used to inform that multiple processes are sharing the same replication origin to apply changes in parallel. The replorigin_session_setup function has a SQL user interface: pg_replication_origin_session_setup. This commit adds an optional parameter that passes the process ID to the internal function replorigin_session_setup. It allows multiple processes to use the same replication origin if you are using the replication functions. --- doc/src/sgml/func/func-admin.sgml | 22 ++++++++++++++++++++-- src/backend/catalog/system_functions.sql | 9 ++++++++- src/backend/replication/logical/origin.c | 4 +++- src/include/catalog/pg_proc.dat | 2 +- 4 files changed, 32 insertions(+), 5 deletions(-) diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml index 446fdfe..4b86676 100644 --- a/doc/src/sgml/func/func-admin.sgml +++ b/doc/src/sgml/func/func-admin.sgml @@ -1315,7 +1315,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset <indexterm> <primary>pg_replication_origin_session_setup</primary> </indexterm> - <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> ) + <function>pg_replication_origin_session_setup</function> ( <parameter>node_name</parameter> <type>text</type> <optional>, <parameter>pid</parameter> <type>integer</type> <literal>DEFAULT</literal> <literal>0</literal></optional> ) <returnvalue>void</returnvalue> </para> <para> @@ -1323,7 +1323,26 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset origin, allowing replay progress to be tracked. Can only be used if no origin is currently selected. Use <function>pg_replication_origin_session_reset</function> to undo. - </para></entry> + If multiple processes can safely use the same replication origin (for + example, parallel apply processes), the optional <parameter>pid</parameter> + parameter can be used to specify the process ID of the first process. + The first process must provide <parameter>pid</parameter> equals to + <literal>0</literal> and the other processes that share the same + replication origin should provide the process ID of the first process. + </para> + <caution> + <para> + When multiple processes share the same replication origin, it is critical + to maintain commit order to prevent data inconsistency. While processes + may send operations out of order, they must commit transactions in the + correct sequence to ensure proper replication consistency. The recommended workflow + for each worker is: set up the replication origin session with the first process's PID, + apply changes within transactions, call <function>pg_replication_origin_xact_setup</function> + with the LSN and commit timestamp before committing, then commit the + transaction only if everything succeeded. + </para> + </caution> + </entry> </row> <row> diff --git a/src/backend/catalog/system_functions.sql b/src/backend/catalog/system_functions.sql index 566f308..f60287d 100644 --- a/src/backend/catalog/system_functions.sql +++ b/src/backend/catalog/system_functions.sql @@ -650,6 +650,13 @@ LANGUAGE INTERNAL CALLED ON NULL INPUT VOLATILE PARALLEL SAFE AS 'pg_stat_reset_slru'; +CREATE OR REPLACE FUNCTION + pg_replication_origin_session_setup(node_name text, pid integer DEFAULT 0) +RETURNS void +LANGUAGE INTERNAL +STRICT VOLATILE PARALLEL UNSAFE +AS 'pg_replication_origin_session_setup'; + -- -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather @@ -751,7 +758,7 @@ REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_progress(boolean) FROM REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_reset() FROM public; -REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text) FROM public; +REVOKE EXECUTE ON FUNCTION pg_replication_origin_session_setup(text, integer) FROM public; REVOKE EXECUTE ON FUNCTION pg_replication_origin_xact_reset() FROM public; diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c index 87f10e5..98d47e1 100644 --- a/src/backend/replication/logical/origin.c +++ b/src/backend/replication/logical/origin.c @@ -1374,12 +1374,14 @@ pg_replication_origin_session_setup(PG_FUNCTION_ARGS) { char *name; RepOriginId origin; + int pid; replorigin_check_prerequisites(true, false); name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); origin = replorigin_by_name(name, false); - replorigin_session_setup(origin, 0); + pid = PG_GETARG_INT32(1); + replorigin_session_setup(origin, pid); replorigin_session_origin = origin; diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 118d6da..dd2d938 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -12223,7 +12223,7 @@ { oid => '6006', descr => 'configure session to maintain replication progress tracking for the passed in origin', proname => 'pg_replication_origin_session_setup', provolatile => 'v', - proparallel => 'u', prorettype => 'void', proargtypes => 'text', + proparallel => 'u', prorettype => 'void', proargtypes => 'text int4', prosrc => 'pg_replication_origin_session_setup' }, { oid => '6007', descr => 'teardown configured replication progress tracking',
