From 43a2c0f21bf77e30084aaeaf06a99dc8cda09b86 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Fri, 2 May 2025 16:45:31 +0500
Subject: [PATCH v2 1/3] Allow checking standby sync before making data visible
 after crash recovery

The crash might happen when Primary and Standby synchronisation
is lost. In this case we should not allow queries to see data
which was not acknoledged by quorum. To do so - do not let in
non-superuser connections until standby quorum is restored.

This behavoir is only needed for HA clsuters, thus is disabled by
the GUC by default.
---
 src/backend/access/transam/xact.c   |  1 +
 src/backend/access/transam/xlog.c   | 22 +++++++++++++++-
 src/backend/replication/syncrep.c   | 40 +++++++++++++++++++++++++++++
 src/backend/utils/init/postinit.c   | 11 ++++++++
 src/backend/utils/misc/guc_tables.c | 10 ++++++++
 src/include/access/xact.h           |  1 +
 src/include/access/xlogrecovery.h   |  4 +++
 src/include/replication/syncrep.h   |  3 +++
 8 files changed, 91 insertions(+), 1 deletion(-)

diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index b885513f765..8962b311361 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -85,6 +85,7 @@ bool		DefaultXactDeferrable = false;
 bool		XactDeferrable;
 
 int			synchronous_commit = SYNCHRONOUS_COMMIT_ON;
+int			startup_synchronous_commit = SYNCHRONOUS_COMMIT_OFF;
 
 /*
  * CheckXidAlive is a xid value pointing to a possibly ongoing (sub)
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 2d4c346473b..99cc519184d 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -562,6 +562,10 @@ typedef struct XLogCtlData
 	XLogRecPtr	lastFpwDisableRecPtr;
 
 	slock_t		info_lck;		/* locks shared variables shown above */
+
+	/* TODO */
+	XLogRecPtr	recoveryEndOfLog;
+	bool		syncRepEstablished;
 } XLogCtlData;
 
 /*
@@ -6040,7 +6044,8 @@ StartupXLOG(void)
 	 * Finish WAL recovery.
 	 */
 	endOfRecoveryInfo = FinishWalRecovery();
-	EndOfLog = endOfRecoveryInfo->endOfLog;
+	Assert(!XLogCtl->syncRepEstablished);
+	XLogCtl->recoveryEndOfLog = EndOfLog = endOfRecoveryInfo->endOfLog;
 	EndOfLogTLI = endOfRecoveryInfo->endOfLogTLI;
 	abortedRecPtr = endOfRecoveryInfo->abortedRecPtr;
 	missingContrecPtr = endOfRecoveryInfo->missingContrecPtr;
@@ -9687,3 +9692,18 @@ SetWalWriterSleeping(bool sleeping)
 	XLogCtl->WalWriterSleeping = sleeping;
 	SpinLockRelease(&XLogCtl->info_lck);
 }
+
+XLogRecPtr
+GetEndOfRecoveryPtr()
+{
+	return XLogCtl->recoveryEndOfLog;
+}
+
+void SetSyncRepEstablished()
+{
+	XLogCtl->syncRepEstablished = true;
+}
+bool IsSyncRepEstablished()
+{
+	return XLogCtl->syncRepEstablished;
+}
\ No newline at end of file
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index cc35984ad00..4ed1d582a11 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -75,6 +75,7 @@
 #include <unistd.h>
 
 #include "access/xact.h"
+#include "access/xlogrecovery.h"
 #include "common/int.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -1138,3 +1139,42 @@ assign_synchronous_commit(int newval, void *extra)
 			break;
 	}
 }
+
+/*
+ * Check if startup synchronous replication is established with the required standbys
+ * based on the synchronous_commit level.
+ */
+bool
+StartupSyncRepEstablished(void)
+{
+	XLogRecPtr replication_lsn = 0;
+	int mode;
+	bool result;
+
+	switch (startup_synchronous_commit)
+	{
+		case SYNCHRONOUS_COMMIT_REMOTE_WRITE:
+			mode = SYNC_REP_WAIT_WRITE;
+			break;
+		case SYNCHRONOUS_COMMIT_REMOTE_FLUSH:
+			mode = SYNC_REP_WAIT_FLUSH;
+			break;
+		case SYNCHRONOUS_COMMIT_REMOTE_APPLY:
+			mode = SYNC_REP_WAIT_APPLY;
+			break;
+		default:
+		/* If startup_synchronous_commit is not set to a synchronous level, no need to check */
+			return true;
+	}
+
+	LWLockAcquire(SyncRepLock, LW_SHARED);
+	replication_lsn = WalSndCtl->lsn[mode];
+	LWLockRelease(SyncRepLock);
+
+	Assert(GetEndOfRecoveryPtr() != InvalidXLogRecPtr);
+	result = (!(WalSndCtl->sync_standbys_status & SYNC_STANDBY_DEFINED)) ||
+			replication_lsn >= GetEndOfRecoveryPtr();
+	if (result)
+		SetSyncRepEstablished();
+	return result;
+}
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 01309ef3f86..10b354fe069 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -42,6 +42,7 @@
 #include "postmaster/postmaster.h"
 #include "replication/slot.h"
 #include "replication/slotsync.h"
+#include "replication/syncrep.h"
 #include "replication/walsender.h"
 #include "storage/aio_subsys.h"
 #include "storage/bufmgr.h"
@@ -1214,6 +1215,16 @@ InitPostgres(const char *in_dbname, Oid dboid,
 	if (PostAuthDelay > 0)
 		pg_usleep(PostAuthDelay * 1000000L);
 
+	/* Check if we need to wait for startup synchronous replication */
+	if (!am_walsender &&
+		!superuser() &&
+		!StartupSyncRepEstablished())
+	{
+		ereport(FATAL,
+				(errcode(ERRCODE_CANNOT_CONNECT_NOW),
+				 errmsg("cannot connect until synchronous replication is established with standbys according to startup_synchronous_standby_level")));
+	}
+
 	/*
 	 * Initialize various default states that can't be set up until we've
 	 * selected the active user and gotten the right GUC settings.
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 60b12446a1c..23e20261013 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -5173,6 +5173,16 @@ struct config_enum ConfigureNamesEnum[] =
 		NULL, assign_synchronous_commit, NULL
 	},
 
+	{
+		{"startup_synchronous_standby_level", PGC_SUSET, REPLICATION_PRIMARY,
+			gettext_noop("Sets the synchronization level neccesary to start node."),
+			NULL
+		},
+		&startup_synchronous_commit,
+		SYNCHRONOUS_COMMIT_OFF, synchronous_commit_options,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"archive_mode", PGC_POSTMASTER, WAL_ARCHIVING,
 			gettext_noop("Allows archiving of WAL files using \"archive_command\"."),
diff --git a/src/include/access/xact.h b/src/include/access/xact.h
index b2bc10ee041..feb47c80dfe 100644
--- a/src/include/access/xact.h
+++ b/src/include/access/xact.h
@@ -81,6 +81,7 @@ typedef enum
 
 /* Synchronous commit level */
 extern PGDLLIMPORT int synchronous_commit;
+extern PGDLLIMPORT int startup_synchronous_commit;
 
 /* used during logical streaming of a transaction */
 extern PGDLLIMPORT TransactionId CheckXidAlive;
diff --git a/src/include/access/xlogrecovery.h b/src/include/access/xlogrecovery.h
index 91446303024..0539c7dd707 100644
--- a/src/include/access/xlogrecovery.h
+++ b/src/include/access/xlogrecovery.h
@@ -155,4 +155,8 @@ extern void RecoveryRequiresIntParameter(const char *param_name, int currValue,
 
 extern void xlog_outdesc(StringInfo buf, XLogReaderState *record);
 
+extern XLogRecPtr GetEndOfRecoveryPtr(void);
+extern void SetSyncRepEstablished(void);
+extern bool IsSyncRepEstablished(void);
+
 #endif							/* XLOGRECOVERY_H */
diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h
index 675669a79f7..bc0205d5fa3 100644
--- a/src/include/replication/syncrep.h
+++ b/src/include/replication/syncrep.h
@@ -107,4 +107,7 @@ extern void syncrep_yyerror(SyncRepConfigData **syncrep_parse_result_p, char **s
 extern void syncrep_scanner_init(const char *str, yyscan_t *yyscannerp);
 extern void syncrep_scanner_finish(yyscan_t yyscanner);
 
+extern void assign_synchronous_commit(int newval, void *extra);
+extern bool StartupSyncRepEstablished(void);
+
 #endif							/* _SYNCREP_H */
-- 
2.39.5 (Apple Git-154)

