From 4511eb68cf8471ed31f0a690b507b7fdd9ccc9e7 Mon Sep 17 00:00:00 2001
From: Shveta Malik <shveta.malik@gmail.com>
Date: Thu, 14 Mar 2024 11:20:00 +0530
Subject: [PATCH v1] Use user as dbname for slot sync

---
 .../libpqwalreceiver/libpqwalreceiver.c       | 45 ++-----------
 src/backend/replication/logical/slotsync.c    | 63 ++++++-------------
 src/backend/replication/slotfuncs.c           |  2 -
 src/include/replication/slotsync.h            |  1 -
 src/include/replication/walreceiver.h         | 12 ++--
 src/interfaces/libpq/exports.txt              |  1 +
 src/interfaces/libpq/fe-exec.c                |  9 +++
 src/interfaces/libpq/libpq-fe.h               |  1 +
 8 files changed, 42 insertions(+), 92 deletions(-)

diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 761bf0f677..9c28f956c1 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -59,7 +59,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
 									char **sender_host, int *sender_port);
 static char *libpqrcv_identify_system(WalReceiverConn *conn,
 									  TimeLineID *primary_tli);
-static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo);
+static char *libpqrcv_get_dbname_from_conn(WalReceiverConn *conn);
 static int	libpqrcv_server_version(WalReceiverConn *conn);
 static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
 											 TimeLineID tli, char **filename,
@@ -102,7 +102,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
 	.walrcv_send = libpqrcv_send,
 	.walrcv_create_slot = libpqrcv_create_slot,
 	.walrcv_alter_slot = libpqrcv_alter_slot,
-	.walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
+	.walrcv_get_dbname_from_conn = libpqrcv_get_dbname_from_conn,
 	.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
 	.walrcv_exec = libpqrcv_exec,
 	.walrcv_disconnect = libpqrcv_disconnect
@@ -494,47 +494,12 @@ libpqrcv_server_version(WalReceiverConn *conn)
 }
 
 /*
- * Get database name from the primary server's conninfo.
- *
- * If dbname is not found in connInfo, return NULL value.
+ * Get database name from the connection.
  */
 static char *
-libpqrcv_get_dbname_from_conninfo(const char *connInfo)
+libpqrcv_get_dbname_from_conn(WalReceiverConn *conn)
 {
-	PQconninfoOption *opts;
-	char	   *dbname = NULL;
-	char	   *err = NULL;
-
-	opts = PQconninfoParse(connInfo, &err);
-	if (opts == NULL)
-	{
-		/* The error string is malloc'd, so we must free it explicitly */
-		char	   *errcopy = err ? pstrdup(err) : "out of memory";
-
-		PQfreemem(err);
-		ereport(ERROR,
-				(errcode(ERRCODE_SYNTAX_ERROR),
-				 errmsg("invalid connection string syntax: %s", errcopy)));
-	}
-
-	for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
-	{
-		/*
-		 * If multiple dbnames are specified, then the last one will be
-		 * returned
-		 */
-		if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
-			*opt->val)
-		{
-			if (dbname)
-				pfree(dbname);
-
-			dbname = pstrdup(opt->val);
-		}
-	}
-
-	PQconninfoFree(opts);
-	return dbname;
+	return PQgetDbname(conn->streamConn);
 }
 
 /*
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 5074c8409f..57c59457c8 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -891,33 +891,6 @@ validate_remote_info(WalReceiverConn *wrconn)
 		CommitTransactionCommand();
 }
 
-/*
- * Checks if dbname is specified in 'primary_conninfo'.
- *
- * Error out if not specified otherwise return it.
- */
-char *
-CheckAndGetDbnameFromConninfo(void)
-{
-	char	   *dbname;
-
-	/*
-	 * The slot synchronization needs a database connection for walrcv_exec to
-	 * work.
-	 */
-	dbname = walrcv_get_dbname_from_conninfo(PrimaryConnInfo);
-	if (dbname == NULL)
-		ereport(ERROR,
-
-		/*
-		 * translator: dbname is a specific option; %s is a GUC variable name
-		 */
-				errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-				errmsg("slot synchronization requires dbname to be specified in %s",
-					   "primary_conninfo"));
-	return dbname;
-}
-
 /*
  * Return true if all necessary GUCs for slot synchronization are set
  * appropriately, otherwise, return false.
@@ -1228,22 +1201,6 @@ ReplSlotSyncWorkerMain(int argc, char *argv[])
 	 */
 	SetConfigOption("search_path", "", PGC_SUSET, PGC_S_OVERRIDE);
 
-	dbname = CheckAndGetDbnameFromConninfo();
-
-	/*
-	 * Connect to the database specified by the user in primary_conninfo. We
-	 * need a database connection for walrcv_exec to work which we use to
-	 * fetch slot information from the remote node. See comments atop
-	 * libpqrcv_exec.
-	 *
-	 * We do not specify a specific user here since the slot sync worker will
-	 * operate as a superuser. This is safe because the slot sync worker does
-	 * not interact with user tables, eliminating the risk of executing
-	 * arbitrary code within triggers.
-	 */
-	InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
-
-	SetProcessingMode(NormalProcessing);
 
 	initStringInfo(&app_name);
 	if (cluster_name[0])
@@ -1264,6 +1221,26 @@ ReplSlotSyncWorkerMain(int argc, char *argv[])
 				errcode(ERRCODE_CONNECTION_FAILURE),
 				errmsg("could not connect to the primary server: %s", err));
 
+	/* There must be dbName initialized in wrconn */
+	dbname = walrcv_get_dbname_from_conn(wrconn);
+	Assert(dbname);
+
+	/*
+	 * Connect to the database which is used by libpq to make connection(wrconn).
+	 * We need a database connection for walrcv_exec to work which we use to
+	 * fetch slot information from the remote node. See comments atop
+	 * libpqrcv_exec.
+	 *
+	 * We do not specify a specific user here since the slot sync worker will
+	 * operate as a superuser. This is safe because the slot sync worker does
+	 * not interact with user tables, eliminating the risk of executing
+	 * arbitrary code within triggers.
+	 */
+	InitPostgres(dbname, InvalidOid, NULL, InvalidOid, 0, NULL);
+
+	SetProcessingMode(NormalProcessing);
+
+
 	/*
 	 * Register the failure callback once we have the connection.
 	 *
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index ad79e1fccd..205e790309 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -977,8 +977,6 @@ pg_sync_replication_slots(PG_FUNCTION_ARGS)
 	/* Load the libpq-specific functions */
 	load_file("libpqwalreceiver", false);
 
-	(void) CheckAndGetDbnameFromConninfo();
-
 	initStringInfo(&app_name);
 	if (cluster_name[0])
 		appendStringInfo(&app_name, "%s_slotsync", cluster_name);
diff --git a/src/include/replication/slotsync.h b/src/include/replication/slotsync.h
index dca57c5020..3f556a9623 100644
--- a/src/include/replication/slotsync.h
+++ b/src/include/replication/slotsync.h
@@ -23,7 +23,6 @@ extern PGDLLIMPORT bool sync_replication_slots;
 extern PGDLLIMPORT char *PrimaryConnInfo;
 extern PGDLLIMPORT char *PrimarySlotName;
 
-extern char *CheckAndGetDbnameFromConninfo(void);
 extern bool ValidateSlotSyncParams(int elevel);
 
 #ifdef EXEC_BACKEND
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index b906bb5ce8..6ede80c250 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -283,11 +283,11 @@ typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
 											TimeLineID *primary_tli);
 
 /*
- * walrcv_get_dbname_from_conninfo_fn
+ * walrcv_get_dbname_from_conn_fn
  *
- * Returns the database name from the primary_conninfo
+ * Returns the database name from the connection.
  */
-typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo);
+typedef char *(*walrcv_get_dbname_from_conn_fn) (WalReceiverConn *conn);
 
 /*
  * walrcv_server_version_fn
@@ -413,7 +413,7 @@ typedef struct WalReceiverFunctionsType
 	walrcv_get_conninfo_fn walrcv_get_conninfo;
 	walrcv_get_senderinfo_fn walrcv_get_senderinfo;
 	walrcv_identify_system_fn walrcv_identify_system;
-	walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo;
+	walrcv_get_dbname_from_conn_fn walrcv_get_dbname_from_conn;
 	walrcv_server_version_fn walrcv_server_version;
 	walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
 	walrcv_startstreaming_fn walrcv_startstreaming;
@@ -439,8 +439,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
 	WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
 #define walrcv_identify_system(conn, primary_tli) \
 	WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
-#define walrcv_get_dbname_from_conninfo(conninfo) \
-	WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
+#define walrcv_get_dbname_from_conn(conn) \
+	WalReceiverFunctions->walrcv_get_dbname_from_conn(conn)
 #define walrcv_server_version(conn) \
 	WalReceiverFunctions->walrcv_server_version(conn)
 #define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
diff --git a/src/interfaces/libpq/exports.txt b/src/interfaces/libpq/exports.txt
index 9fbd3d3407..e879116a0d 100644
--- a/src/interfaces/libpq/exports.txt
+++ b/src/interfaces/libpq/exports.txt
@@ -202,3 +202,4 @@ PQcancelSocket            199
 PQcancelErrorMessage      200
 PQcancelReset             201
 PQcancelFinish            202
+PQgetDbname               203
diff --git a/src/interfaces/libpq/fe-exec.c b/src/interfaces/libpq/fe-exec.c
index c02a9180b2..4b61cebdf7 100644
--- a/src/interfaces/libpq/fe-exec.c
+++ b/src/interfaces/libpq/fe-exec.c
@@ -2917,6 +2917,15 @@ PQendcopy(PGconn *conn)
 }
 
 
+char *
+PQgetDbname(PGconn *conn)
+{
+	if (!conn)
+		return 0;
+
+	return conn->dbName;
+}
+
 /* ----------------
  *		PQfn -	Send a function call to the POSTGRES backend.
  *
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index 09b485bd2b..eac2984d1c 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -516,6 +516,7 @@ extern int	PQputline(PGconn *conn, const char *string);
 extern int	PQgetlineAsync(PGconn *conn, char *buffer, int bufsize);
 extern int	PQputnbytes(PGconn *conn, const char *buffer, int nbytes);
 extern int	PQendcopy(PGconn *conn);
+extern char *PQgetDbname(PGconn *conn);
 
 /* Set blocking/nonblocking connection to the backend */
 extern int	PQsetnonblocking(PGconn *conn, int arg);
-- 
2.34.1

