From 1a4fe8c190ae6f08d881c1e423b9f530c122aff8 Mon Sep 17 00:00:00 2001
From: Shlok Kyal <shlok.kyal.oss@gmail.com>
Date: Fri, 24 May 2024 14:08:07 +0530
Subject: [PATCH v3 1/5] Improve the code that checks if the recovery is
 finishing

The recovery process has a window between the walreceiver shutdown and
the pg_is_in_recovery function returns false. It means that the
pg_stat_wal_receiver checks can cause the server to finish the recovery
(even if it already reaches the recovery target). So instead of using
the pg_stat_wal_receiver we can use pg_last_wal_receive_lsn to get
last received lsn on standby and check if the consistent_lsn is
already present on standby and check if the recovery is finished.
---
 src/bin/pg_basebackup/pg_createsubscriber.c | 63 +++++++++++++--------
 1 file changed, 39 insertions(+), 24 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c
index 90cc580811..d602aac405 100644
--- a/src/bin/pg_basebackup/pg_createsubscriber.c
+++ b/src/bin/pg_basebackup/pg_createsubscriber.c
@@ -29,6 +29,7 @@
 #include "getopt_long.h"
 
 #define	DEFAULT_SUB_PORT	"50432"
+#define NUM_ATTEMPTS		60
 
 /* Command-line options */
 struct CreateSubscriberOptions
@@ -94,7 +95,8 @@ static void start_standby_server(const struct CreateSubscriberOptions *opt,
 								 bool restricted_access);
 static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
-								  const struct CreateSubscriberOptions *opt);
+								  const struct CreateSubscriberOptions *opt,
+								  const char *consistent_lsn);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void drop_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
 static void create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo);
@@ -1362,57 +1364,70 @@ stop_standby_server(const char *datadir)
  * the recovery process. By default, it waits forever.
  */
 static void
-wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt)
+wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt, const char *consistent_lsn)
 {
 	PGconn	   *conn;
+	int			len1;
 	int			status = POSTMASTER_STILL_STARTING;
 	int			timer = 0;
-	int			count = 0;		/* number of consecutive connection attempts */
-
-#define NUM_CONN_ATTEMPTS	10
+	uint32		id;
+	uint32		off;
+	uint64		target_lsn;
 
 	pg_log_info("waiting for the target server to reach the consistent state");
 
+	/*
+	 * Get consistent_lsn as unsigned 64-bit integer.
+	 */
+	if (!dry_run)
+	{
+		len1 = strspn(consistent_lsn, "0123456789abcdefABCDEF");
+		id = (uint32) strtoul(consistent_lsn, NULL, 16);
+		off = (uint32) strtoul(consistent_lsn + len1 + 1, NULL, 16);
+		target_lsn = ((uint64) id << 32) | off;
+	}
+
 	conn = connect_database(conninfo, true);
 
 	for (;;)
 	{
 		PGresult   *res;
+		uint64	   	lsn;
 		bool		in_recovery = server_is_in_recovery(conn);
 
 		/*
-		 * Does the recovery process finish? In dry run mode, there is no
-		 * recovery mode. Bail out as the recovery process has ended.
+		 * In dry run mode, there is no recovery mode.
 		 */
-		if (!in_recovery || dry_run)
-		{
+		if (dry_run) {
 			status = POSTMASTER_READY;
 			recovery_ended = true;
 			break;
 		}
 
 		/*
-		 * If it is still in recovery, make sure the target server is
-		 * connected to the primary so it can receive the required WAL to
-		 * finish the recovery process. If it is disconnected try
-		 * NUM_CONN_ATTEMPTS in a row and bail out if not succeed.
+		 * If the consistent_lsn is already present on standby and the
+		 * recovery is finished then bail out.
 		 */
 		res = PQexec(conn,
-					 "SELECT 1 FROM pg_catalog.pg_stat_wal_receiver");
-		if (PQntuples(res) == 0)
+					 "SELECT pg_catalog.pg_last_wal_receive_lsn() - '0/0'");
+		if (PQresultStatus(res) != PGRES_TUPLES_OK)
 		{
-			if (++count > NUM_CONN_ATTEMPTS)
-			{
-				stop_standby_server(subscriber_dir);
-				pg_log_error("standby server disconnected from the primary");
-				break;
-			}
+			pg_log_error("could not get last wal recieved on standby: %s",
+						 PQresultErrorMessage(res));
+			PQclear(res);
+			break;
 		}
-		else
-			count = 0;			/* reset counter if it connects again */
 
+		lsn = strtoul(PQgetvalue(res, 0, 0), NULL, 10);
 		PQclear(res);
 
+		if(lsn >= target_lsn && !in_recovery)
+		{
+			status = POSTMASTER_READY;
+			recovery_ended = true;
+			break;
+		}
+
 		/* Bail out after recovery_timeout seconds if this option is set */
 		if (opt->recovery_timeout > 0 && timer >= opt->recovery_timeout)
 		{
@@ -2121,7 +2136,7 @@ main(int argc, char **argv)
 	start_standby_server(&opt, true);
 
 	/* Waiting the subscriber to be promoted */
-	wait_for_end_recovery(dbinfo[0].subconninfo, &opt);
+	wait_for_end_recovery(dbinfo[0].subconninfo, &opt, consistent_lsn);
 
 	/*
 	 * Create the subscription for each database on subscriber. It does not
-- 
2.34.1

