On Wed, Jan 21, 2026 at 7:49 PM shveta malik <[email protected]> wrote: > > On Thu, Jan 22, 2026 at 5:19 AM Masahiko Sawada <[email protected]> wrote: > > > > On Mon, Jan 19, 2026 at 10:38 PM shveta malik <[email protected]> > > wrote: > > > > > > On Wed, Jan 14, 2026 at 11:24 PM Masahiko Sawada <[email protected]> > > > wrote: > > > > > > > > I've attached the updated patch. > > > > > > > > > > Thank You for the patch. I like the idea of optimization. Few initial > > > comments: > > > > Thank you for reviewing the patch! > > > > > > > > 1) > > > + * The query returns the slot names and their caught-up status in > > > + * the same order as the results collected by > > > + * get_old_cluster_logical_slot_infos(). If this query is changed, > > > > > > I could not find the function get_old_cluster_logical_slot_infos(), do > > > you mean get_old_cluster_logical_slot_infos_query()? > > > > It seems an oversight in commit 6d3d2e8e541f0. I think it should be > > get_db_rel_and_slot_infos(). > > > > > > > > 2) > > > " WHERE database = current_database() AND " > > > " slot_type = 'logical' AND " > > > > > > Is there a reason why database = current_database() is placed before > > > slot_type = 'logical'? I am not sure how the PostgreSQL optimizer and > > > executor will order these predicates, but from the first look, > > > slot_type = 'logical' appears cheaper and could be placed first, > > > consistent with the ordering used at other places. > > > > Changed. > > > > > > > > 3) > > > Shouldn’t we add a sanity check inside > > > get_old_cluster_logical_slot_infos_query() to ensure that when > > > skip_caught_up_check is true, we are on PostgreSQL 18 or lower? This > > > would make the function safer for future use if it's called elsewhere. > > > I understand the caller already performs a similar check, but I think > > > it's more appropriate here since we call > > > binary_upgrade_logical_slot_has_caught_up() from inside, which doesn’t > > > even exist on newer versions. > > > > What kind of sanity check did you mean? We can have a check with > > pg_fatal() but it seems almost the same to me even if pg_upgrade fails > > with an error due to missing > > binary_upgrade_logical_slot_has_caught_up(). > > I was referring to a development-level sanity check, something like: > > /* skip_caught_up_check is required iff PG19 or newer */ > Assert((GET_MAJOR_VERSION(cluster->major_version) >= 1900) == > skip_caught_up_check); > > But performing this check requires access to the cluster version (or > cluster information), which this function currently does not have. > Given that, do you think it would make sense to pass the cluster as an > argument to this function in order to perform the sanity check here?
Hmm, I think it's better not to have the same check in multiple places, but it might make sense to have get_old_cluster_logical_slot_infos_query() decide whether to use the fast method. I've updated the patch accordingly, please review it. > > > > > > > 4) > > > +# Check the file content. While both test_slot1 and test_slot2 should > > > be reporting > > > +# that they have unconsumed WAL records, test_slot3 should not be > > > reported as > > > +# it has caught up. > > > > > > Can you please elaborate the reason behind test_slot3 not being > > > reported? Also mention in the comment if possible. > > > > We advance test_slot3 to the current WAL LSN before executing > > pg_upgrade, so the test_slot3 should have consumed all pending WALs. > > Please refer to the following changes: > > I understand the test, and the comments are clear to me. I also > understand that only test_slot3 is expected to be in the caught-up > state. My questions were specifically about the following points: > 1) Why do we expect 'slot3 caught-up' not to be mentioned in the LOG? > Is it simply because there is no corresponding logging in the code, or > is this behavior related to some aspect of your fix that I may have > missed? > > 2) In general, we do not validate the absence of LOG messages in > tests. Why is this considered a special case where such a check is > appropriate? What LOG do you refer to? In these tests, we check the invalid_logical_slots.txt file where pg_upgrade reports only invalid slots (in terms of pg_upgrade). For test_slot3, it should not be mentioned in that file as it has caught up. Given that the file has only invalid slots, checking the absence of test_slot3 in the file makes sense to me. Regards, -- Masahiko Sawada Amazon Web Services: https://aws.amazon.com
From e436efa536923a13bd22a503976a3d7bb8a51650 Mon Sep 17 00:00:00 2001 From: Masahiko Sawada <[email protected]> Date: Wed, 7 Jan 2026 12:06:35 -0800 Subject: [PATCH v7] pg_upgrade: Optimize replication slot caught-up check. Commit 29d0a77fa6 improved pg_upgrade to allow migrating logical slots provided that all logical slots have caught up (i.e., they have no pending decodable WAL records). Previously, this verification was done by checking each slot individually, which could be time-consuming if there were many logical slots to migrate. This commit optimizes the check to avoid reading the same WAL stream multiple times. It performs the check only for the slot with the minimum confirmed_flush_lsn and applies the result to all other slots in the same database. This limits the check to at most one logical slot per database. During the check, we identify the last decodable WAL record's LSN to report any slots with unconsumed records, consistent with the existing error reporting behavior. Additionally, the maximum confirmed_flush_lsn is used as an early scan cutoff; finding a decodable WAL record beyond this point implies that no slot has caught up. Performance testing demonstrated that the execution time remains stable regardless of the number of slots in the database. Note that we do not distinguish slots based on their output plugins. A hypothetical plugin might use a replication origin filter that filters out changes from a specific origin. In such cases, we might get a false positive (erroneously considering a slot caught up). However, this is safe from a data integrity standpoint, such scenarios are rare, and the impact of a false positive is minimal. This optimization is applied only when the old cluster is version 19 or later. XXX Bump catversion. Reviewed-by: Chao Li <[email protected]> Reviewed-by: shveta malik <[email protected]> Discussion: CAD21AoBZ0LAcw1OHGEKdW7S5TRJaURdhEk3CLAW69_siqfqyAg@mail.gmail.com">https://postgr.es/m/CAD21AoBZ0LAcw1OHGEKdW7S5TRJaURdhEk3CLAW69_siqfqyAg@mail.gmail.com --- src/backend/replication/logical/logical.c | 37 ++++-- src/backend/utils/adt/pg_upgrade_support.c | 14 ++- src/bin/pg_upgrade/check.c | 2 +- src/bin/pg_upgrade/info.c | 140 +++++++++++++++++++-- src/bin/pg_upgrade/t/003_logical_slots.pl | 25 ++-- src/include/catalog/pg_proc.dat | 6 +- src/include/replication/logical.h | 3 +- 7 files changed, 190 insertions(+), 37 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index b0ef1a12520..452acd1771d 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -1986,16 +1986,22 @@ UpdateDecodingStats(LogicalDecodingContext *ctx) } /* - * Read up to the end of WAL starting from the decoding slot's restart_lsn. - * Return true if any meaningful/decodable WAL records are encountered, - * otherwise false. + * Read up to the end of WAL starting from the decoding slot's restart_lsn + * to end_of_wal in order to check if any meaningful/decodable WAL records + * are encountered. scan_cutoff_lsn is the LSN, where we can terminate the + * WAL scan early if we find a decodable WAL record after this LSN. + * + * Returns the last LSN decodable WAL record's LSN if found, otherwise + * returns InvalidXLogRecPtr. */ -bool -LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) +XLogRecPtr +LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal, + XLogRecPtr scan_cutoff_lsn) { - bool has_pending_wal = false; + XLogRecPtr last_pending_wal = InvalidXLogRecPtr; Assert(MyReplicationSlot); + Assert(end_of_wal >= scan_cutoff_lsn); PG_TRY(); { @@ -2024,7 +2030,7 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) InvalidateSystemCaches(); /* Loop until the end of WAL or some changes are processed */ - while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal) + while (ctx->reader->EndRecPtr < end_of_wal) { XLogRecord *record; char *errm = NULL; @@ -2037,7 +2043,20 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) if (record != NULL) LogicalDecodingProcessRecord(ctx, ctx->reader); - has_pending_wal = ctx->processing_required; + if (ctx->processing_required) + { + last_pending_wal = ctx->reader->ReadRecPtr; + + /* + * If we find a decodable WAL after the scan_cutoff_lsn point, + * we can terminate the scan early. + */ + if (last_pending_wal >= scan_cutoff_lsn) + break; + + /* Reset the flag and continue checking */ + ctx->processing_required = false; + } CHECK_FOR_INTERRUPTS(); } @@ -2055,7 +2074,7 @@ LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal) } PG_END_TRY(); - return has_pending_wal; + return last_pending_wal; } /* diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c index 8953a17753e..ff3dde7a9b3 100644 --- a/src/backend/utils/adt/pg_upgrade_support.c +++ b/src/backend/utils/adt/pg_upgrade_support.c @@ -282,11 +282,12 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS) * upgraded without data loss. */ Datum -binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) +binary_upgrade_check_logical_slot_pending_wal(PG_FUNCTION_ARGS) { Name slot_name; XLogRecPtr end_of_wal; - bool found_pending_wal; + XLogRecPtr scan_cutoff_lsn; + XLogRecPtr last_pending_wal; CHECK_IS_BINARY_UPGRADE; @@ -297,6 +298,7 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) Assert(has_rolreplication(GetUserId())); slot_name = PG_GETARG_NAME(0); + scan_cutoff_lsn = PG_GETARG_LSN(1); /* Acquire the given slot */ ReplicationSlotAcquire(NameStr(*slot_name), true, true); @@ -307,12 +309,16 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS) Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE); end_of_wal = GetFlushRecPtr(NULL); - found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal); + last_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal, + scan_cutoff_lsn); /* Clean up */ ReplicationSlotRelease(); - PG_RETURN_BOOL(!found_pending_wal); + if (XLogRecPtrIsValid(last_pending_wal)) + PG_RETURN_LSN(last_pending_wal); + else + PG_RETURN_NULL(); } /* diff --git a/src/bin/pg_upgrade/check.c b/src/bin/pg_upgrade/check.c index 64805fef0eb..1632dcc9808 100644 --- a/src/bin/pg_upgrade/check.c +++ b/src/bin/pg_upgrade/check.c @@ -624,7 +624,7 @@ check_and_dump_old_cluster(void) { /* * Logical replication slots can be migrated since PG17. See comments - * atop get_old_cluster_logical_slot_infos(). + * in get_db_rel_and_slot_infos(). */ check_old_cluster_for_valid_slots(); diff --git a/src/bin/pg_upgrade/info.c b/src/bin/pg_upgrade/info.c index 2de0ee4d030..eeddbab968d 100644 --- a/src/bin/pg_upgrade/info.c +++ b/src/bin/pg_upgrade/info.c @@ -29,8 +29,11 @@ static void free_rel_infos(RelInfoArr *rel_arr); static void print_db_infos(DbInfoArr *db_arr); static void print_rel_infos(RelInfoArr *rel_arr); static void print_slot_infos(LogicalSlotInfoArr *slot_arr); -static char *get_old_cluster_logical_slot_infos_query(void); +static char *get_old_cluster_logical_slot_infos_query(ClusterInfo *cluster, + bool *use_fast_caught_up_check); static void process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg); +static void process_old_cluster_logical_slot_catchup_infos(DbInfo *dbinfo, PGresult *res, + void *arg); /* @@ -307,11 +310,74 @@ get_db_rel_and_slot_infos(ClusterInfo *cluster) if (cluster == &old_cluster && GET_MAJOR_VERSION(cluster->major_version) > 1600) { - logical_slot_infos_query = get_old_cluster_logical_slot_infos_query(); + bool use_fast_caught_up_check; + + logical_slot_infos_query = get_old_cluster_logical_slot_infos_query(cluster, + &use_fast_caught_up_check); + upgrade_task_add_step(task, logical_slot_infos_query, process_old_cluster_logical_slot_infos, true, NULL); + + /* + * Check whether slots have consumed all WAL records efficiently by + * using another query, if not during a live_check. + */ + if (use_fast_caught_up_check && !user_opts.live_check) + { + /* + * We optimize the slot caught-up check to avoid reading the same + * WAL stream multiple times: execute the caught-up check only for + * the slot with the minimum confirmed_flush_lsn, and apply the + * same result to all other slots in the same database. This + * limits the check to at most one logical slot per database. We + * also use the maximum confirmed_flush_lsn as an early scan + * cutoff; finding a decodable WAL record beyond this point + * implies that no slot has caught up. + * + * Note that we don't distinguish slots based on their output + * plugin. If a plugin applies replication origin filters, we + * might get a false positive (i.e., erroneously considering a + * slot caught up). However, such cases are very rare, and the + * impact of a false positive is minimal. + * + * The query returns the slot names and their caught-up status in + * the same order as the results collected by + * get_old_cluster_logical_slot_infos_query(). If this query is + * changed, we need to make sure that it remains consistent with + * that function. + */ + const char *slot_caught_up_info_query = + "WITH check_caught_up AS ( " + " SELECT pg_catalog.binary_upgrade_check_logical_slot_pending_wal(slot_name, " + " MAX(confirmed_flush_lsn) OVER ()) as last_pending_wal " + " FROM pg_replication_slots " + " WHERE slot_type = 'logical' AND " + " database = current_database() AND " + " temporary IS FALSE AND " + " invalidation_reason IS NULL " + " ORDER BY confirmed_flush_lsn ASC " + " LIMIT 1 " + ") " + "SELECT slot_name, " + " CASE " + " WHEN invalidation_reason IS NULL THEN " + " last_pending_wal IS NULL OR " + " confirmed_flush_lsn > last_pending_wal " + " ELSE FALSE " + " END as caught_up " + "FROM pg_replication_slots, check_caught_up " + "WHERE slot_type = 'logical' AND " + " database = current_database() AND " + " temporary IS FALSE " + "ORDER BY 1"; + + upgrade_task_add_step(task, + slot_caught_up_info_query, + process_old_cluster_logical_slot_catchup_infos, + true, NULL); + } } upgrade_task_run(task, cluster); @@ -682,15 +748,29 @@ process_rel_infos(DbInfo *dbinfo, PGresult *res, void *arg) * logical replication slots in the database, for use by * get_db_rel_and_slot_infos()'s UpgradeTask. The status of each logical slot * is checked in check_old_cluster_for_valid_slots(). + * + * use_fast_caught_up_check is set to true on return if available in the given + * cluster. */ static char * -get_old_cluster_logical_slot_infos_query(void) +get_old_cluster_logical_slot_infos_query(ClusterInfo *cluster, bool *use_fast_caught_up_check) { + /* + * The faster caught-up check method for slot caught-up check is available + * on PG19 or newer. + */ + *use_fast_caught_up_check = (GET_MAJOR_VERSION(cluster->major_version) >= 1900); + /* * Fetch the logical replication slot information. The check whether the - * slot is considered caught up is done by an upgrade function. This - * regards the slot as caught up if we don't find any decodable changes. - * See binary_upgrade_logical_slot_has_caught_up(). + * slot is considered caught up is done by an upgrade function, unless the + * fast check is available on the cluster. This regards the slot as caught + * up if we don't find any decodable changes. + * + * Note that binary_upgrade_logical_slot_has_caught_up() is available only + * PG18 or older. For PG19 or newer *use_fast_caught_up_check should be + * set true, and use binary_upgrade_check_logical_slot_pending_wal() + * instead in the separate query (see slot_caught_up_info_query). * * Note that we can't ensure whether the slot is caught up during * live_check as the new WAL records could be generated. @@ -708,16 +788,16 @@ get_old_cluster_logical_slot_infos_query(void) "FROM pg_catalog.pg_replication_slots " "WHERE slot_type = 'logical' AND " "database = current_database() AND " - "temporary IS FALSE;", - user_opts.live_check ? "FALSE" : + "temporary IS FALSE " + "ORDER BY 1;", + (*use_fast_caught_up_check || user_opts.live_check) ? "FALSE" : "(CASE WHEN invalidation_reason IS NOT NULL THEN FALSE " "ELSE (SELECT pg_catalog.binary_upgrade_logical_slot_has_caught_up(slot_name)) " "END)"); } /* - * Callback function for processing results of the query returned by - * get_old_cluster_logical_slot_infos_query(), which is used for + * Callback function for processing results of the query, which is used for * get_db_rel_and_slot_infos()'s UpgradeTask. This function stores the logical * slot information for later use. */ @@ -765,6 +845,44 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) dbinfo->slot_arr.nslots = num_slots; } +/* + * Callback function for processing results of the query, which is used for + * get_db_rel_and_slot_infos()'s UpgradeTask. This function updates the caught_up + * field for each slot information collected by + * process_old_cluster_logical_slot_infos(). + */ +static void +process_old_cluster_logical_slot_catchup_infos(DbInfo *dbinfo, PGresult *res, void *arg) +{ + int num_slots = PQntuples(res); + + AssertVariableIsOfType(&process_old_cluster_logical_slot_catchup_infos, + UpgradeTaskProcessCB); + Assert(num_slots == dbinfo->slot_arr.nslots); + + /* + * Update the caught_up field of each logical slot. The caught_up values + * are retrieved in the same order as the slots were collected in + * process_old_cluster_logical_slot_infos(), so we can update the slots + * sequentially. + */ + for (int i = 0; i < num_slots; i++) + { + LogicalSlotInfo *s = &(dbinfo->slot_arr.slots[i]); + char *slotname; + bool caught_up; + + slotname = PQgetvalue(res, i, PQfnumber(res, "slot_name")); + caught_up = (strcmp(PQgetvalue(res, i, PQfnumber(res, "caught_up")), "t") == 0); + + /* Sanity check */ + if (strcmp(slotname, s->slotname) != 0) + pg_fatal("tried to update logical slot \"%s\", expected \"%s\"", + slotname, s->slotname); + + s->caught_up = caught_up; + } +} /* * count_old_cluster_logical_slots() @@ -773,7 +891,7 @@ process_old_cluster_logical_slot_infos(DbInfo *dbinfo, PGresult *res, void *arg) * * Note: this function always returns 0 if the old_cluster is PG16 and prior * because we gather slot information only for cluster versions greater than or - * equal to PG17. See get_old_cluster_logical_slot_infos(). + * equal to PG17. See get_db_rel_and_slot_infos(). */ int count_old_cluster_logical_slots(void) diff --git a/src/bin/pg_upgrade/t/003_logical_slots.pl b/src/bin/pg_upgrade/t/003_logical_slots.pl index b9abc3a2e21..15e6d267f2f 100644 --- a/src/bin/pg_upgrade/t/003_logical_slots.pl +++ b/src/bin/pg_upgrade/t/003_logical_slots.pl @@ -64,6 +64,7 @@ $oldpub->safe_psql( 'postgres', qq[ SELECT pg_create_logical_replication_slot('test_slot1', 'test_decoding'); SELECT pg_create_logical_replication_slot('test_slot2', 'test_decoding'); + SELECT pg_create_logical_replication_slot('test_slot3', 'test_decoding'); ]); $oldpub->stop(); @@ -77,7 +78,7 @@ command_checks_all( [@pg_upgrade_cmd], 1, [ - qr/"max_replication_slots" \(1\) must be greater than or equal to the number of logical replication slots \(2\) on the old cluster/ + qr/"max_replication_slots" \(1\) must be greater than or equal to the number of logical replication slots \(3\) on the old cluster/ ], [qr//], 'run of pg_upgrade where the new cluster has insufficient "max_replication_slots"' @@ -85,29 +86,31 @@ command_checks_all( ok(-d $newpub->data_dir . "/pg_upgrade_output.d", "pg_upgrade_output.d/ not removed after pg_upgrade failure"); -# Set 'max_replication_slots' to match the number of slots (2) present on the +# Set 'max_replication_slots' to match the number of slots (3) present on the # old cluster. Both slots will be used for subsequent tests. -$newpub->append_conf('postgresql.conf', "max_replication_slots = 2"); +$newpub->append_conf('postgresql.conf', "max_replication_slots = 3"); # ------------------------------ # TEST: Confirm pg_upgrade fails when the slot still has unconsumed WAL records # Preparations for the subsequent test: -# 1. Generate extra WAL records. At this point neither test_slot1 nor -# test_slot2 has consumed them. +# 1. Generate extra WAL records. At this point none of the slots has consumed them. # # 2. Advance the slot test_slot2 up to the current WAL location, but test_slot1 # still has unconsumed WAL records. # # 3. Emit a non-transactional message. This will cause test_slot2 to detect the # unconsumed WAL record. +# +# 4. Advance the slot test_slots3 up to the current WAL location. $oldpub->start; $oldpub->safe_psql( 'postgres', qq[ CREATE TABLE tbl AS SELECT generate_series(1, 10) AS a; SELECT pg_replication_slot_advance('test_slot2', pg_current_wal_lsn()); - SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message'); + SELECT count(*) FROM pg_logical_emit_message('false', 'prefix', 'This is a non-transactional message', true); + SELECT pg_replication_slot_advance('test_slot3', pg_current_wal_lsn()); ]); $oldpub->stop; @@ -138,8 +141,9 @@ find( }, $newpub->data_dir . "/pg_upgrade_output.d"); -# Check the file content. Both slots should be reporting that they have -# unconsumed WAL records. +# Check the file content. While both test_slot1 and test_slot2 should be reporting +# that they have unconsumed WAL records, test_slot3 should not be reported as +# it has caught up. like( slurp_file($slots_filename), qr/The slot \"test_slot1\" has not consumed the WAL yet/m, @@ -148,6 +152,10 @@ like( slurp_file($slots_filename), qr/The slot \"test_slot2\" has not consumed the WAL yet/m, 'the previous test failed due to unconsumed WALs'); +unlike( + slurp_file($slots_filename), + qr/test_slot3/m, + 'caught-up slot is not reported'); # ------------------------------ @@ -162,6 +170,7 @@ $oldpub->safe_psql( 'postgres', qq[ SELECT * FROM pg_drop_replication_slot('test_slot1'); SELECT * FROM pg_drop_replication_slot('test_slot2'); + SELECT * FROM pg_drop_replication_slot('test_slot3'); CREATE PUBLICATION regress_pub FOR ALL TABLES; ]); diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat index 894b6a1b6d6..36e4ed67c8a 100644 --- a/src/include/catalog/pg_proc.dat +++ b/src/include/catalog/pg_proc.dat @@ -11832,9 +11832,9 @@ proparallel => 'u', prorettype => 'void', proargtypes => 'oid', prosrc => 'binary_upgrade_set_next_pg_tablespace_oid' }, { oid => '6312', descr => 'for use by pg_upgrade', - proname => 'binary_upgrade_logical_slot_has_caught_up', provolatile => 'v', - proparallel => 'u', prorettype => 'bool', proargtypes => 'name', - prosrc => 'binary_upgrade_logical_slot_has_caught_up' }, + proname => 'binary_upgrade_check_logical_slot_pending_wal', provolatile => 'v', + proparallel => 'u', prorettype => 'pg_lsn', proargtypes => 'name pg_lsn', + prosrc => 'binary_upgrade_check_logical_slot_pending_wal' }, { oid => '6319', descr => 'for use by pg_upgrade (relation for pg_subscription_rel)', proname => 'binary_upgrade_add_sub_rel_state', proisstrict => 'f', diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 5b43e181135..45f174730d4 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -148,7 +148,8 @@ extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId extern void ResetLogicalStreamingState(void); extern void UpdateDecodingStats(LogicalDecodingContext *ctx); -extern bool LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal); +extern XLogRecPtr LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal, + XLogRecPtr scan_cutoff_lsn); extern XLogRecPtr LogicalSlotAdvanceAndCheckSnapState(XLogRecPtr moveto, bool *found_consistent_snapshot); -- 2.47.3
