From b816e48049cb41c9390761e5cb2546447ada38a2 Mon Sep 17 00:00:00 2001
From: Hari Babu <kommi.haribabu@gmail.com>
Date: Wed, 10 Apr 2019 23:19:09 +1000
Subject: [PATCH 6/8] Primary, prefer-standby and standby options

New options to check whether the server is in recovery mode
or not, before considering them to connect. To confirm whether
the server is running in recovery mode or not, it sends the query
to server as 'SELECT pg_is_in_recovery()'.
---
 doc/src/sgml/libpq.sgml               |  26 ++-
 src/interfaces/libpq/fe-connect.c     | 236 +++++++++++++++++++++++---
 src/interfaces/libpq/libpq-fe.h       |   8 +-
 src/interfaces/libpq/libpq-int.h      |   4 +-
 src/test/recovery/t/001_stream_rep.pl |  18 +-
 5 files changed, 262 insertions(+), 30 deletions(-)

diff --git a/doc/src/sgml/libpq.sgml b/doc/src/sgml/libpq.sgml
index 2be51b2a49..6b84accd08 100644
--- a/doc/src/sgml/libpq.sgml
+++ b/doc/src/sgml/libpq.sgml
@@ -1649,7 +1649,8 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
       <listitem>
        <para>
         The supported options for this parameter are, <literal>any</literal>,
-        <literal>read-write</literal>, <literal>prefer-read</literal> and <literal>read-only</literal>.
+        <literal>read-write</literal>, <literal>prefer-read</literal>, <literal>read-only</literal>,
+        <literal>primary</literal>, <literal>prefer-standby</literal> and <literal>standby</literal>.
         The default value of this parameter, <literal>any</literal>, regards
         all connections as acceptable. If multiple hosts were specified in the
         connection string, based on the specified value, any remaining servers
@@ -1682,6 +1683,29 @@ postgresql://%2Fvar%2Flib%2Fpostgresql/dbname
         If this parameter is set to <literal>read-only</literal>, only a connection
         in which read-only transactions are accepted by default.
        </para>
+
+       <para>
+        If this parameter is set to <literal>primary</literal>, only a connection in which
+        where the server is not in recovery mode.
+       </para>
+
+       <para>
+        If this parameter is set to <literal>prefer-standby</literal>, only a connection in which
+        where the server is in recovery mode is preferred. If no such connections can be found,
+        then a connection in which server is not in recovery mode will be considered.
+       </para>
+
+       <para>
+        If this parameter is set to <literal>standby</literal>, only a connection in which
+        where the server is in recovery mode.
+       </para>
+
+       <para>
+        To find out whether the server is in recovery mode or not, query <literal>SELECT pg_is_in_recovery()</literal>
+        will be sent upon any successful connection; if it returns <literal>t</literal>, means server
+        is in recovery mode.
+       </para>
+
       </listitem>
      </varlistentry>
     </variablelist>
diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c
index 38cbb07828..a6e0959895 100644
--- a/src/interfaces/libpq/fe-connect.c
+++ b/src/interfaces/libpq/fe-connect.c
@@ -124,6 +124,7 @@ static int	ldapServiceLookup(const char *purl, PQconninfoOption *options,
 #define DefaultOption	""
 #define DefaultAuthtype		  ""
 #define DefaultTargetSessionAttrs	"any"
+#define DefaultTargetServerType	"any"
 #ifdef USE_SSL
 #define DefaultSSLMode "prefer"
 #else
@@ -340,7 +341,7 @@ static const internalPQconninfoOption PQconninfoOptions[] = {
 
 	{"target_session_attrs", "PGTARGETSESSIONATTRS",
 		DefaultTargetSessionAttrs, NULL,
-		"Target-Session-Attrs", "", 12, /* sizeof("prefer-read") = 12 */
+		"Target-Session-Attrs", "", 15, /* sizeof("prefer-standby") = 15 */
 	offsetof(struct pg_conn, target_session_attrs)},
 
 	/* Terminating entry --- MUST BE LAST */
@@ -1302,6 +1303,12 @@ connectOptions2(PGconn *conn)
 			conn->requested_session_type = SESSION_TYPE_PREFER_READ;
 		else if (strcmp(conn->target_session_attrs, "read-only") == 0)
 			conn->requested_session_type = SESSION_TYPE_READ_ONLY;
+		else if (strcmp(conn->target_session_attrs, "primary") == 0)
+			conn->requested_session_type = SESSION_TYPE_PRIMARY;
+		else if (strcmp(conn->target_session_attrs, "prefer-standby") == 0)
+			conn->requested_session_type = SESSION_TYPE_PREFER_STANDBY;
+		else if (strcmp(conn->target_session_attrs, "standby") == 0)
+			conn->requested_session_type = SESSION_TYPE_STANDBY;
 		else
 		{
 			conn->status = CONNECTION_BAD;
@@ -2198,6 +2205,7 @@ PQconnectPoll(PGconn *conn)
 		case CONNECTION_CHECK_WRITABLE:
 		case CONNECTION_CONSUME:
 		case CONNECTION_GSS_STARTUP:
+		case CONNECTION_CHECK_RECOVERY:
 			break;
 
 		default:
@@ -2237,19 +2245,19 @@ keep_going:						/* We will come back to here until there is
 
 		if (conn->whichhost + 1 >= conn->nconnhost)
 		{
-			if (conn->read_write_host_index >= 0)
+			if (conn->read_write_or_primary_host_index >= 0)
 			{
 				/*
 				 * Getting here means, failed to connect to read-only servers
 				 * and now try connect to read-write server again.
 				 */
-				conn->whichhost = conn->read_write_host_index;
+				conn->whichhost = conn->read_write_or_primary_host_index;
 
 				/*
 				 * Reset the host index value to avoid recursion during the
 				 * second connection attempt.
 				 */
-				conn->read_write_host_index = -2;
+				conn->read_write_or_primary_host_index = -2;
 			}
 			else
 			{
@@ -3506,7 +3514,9 @@ keep_going:						/* We will come back to here until there is
 				 * may just skip the test in that case.
 				 */
 				if (conn->sversion >= 70400 &&
-					conn->requested_session_type != SESSION_TYPE_ANY)
+					(conn->requested_session_type == SESSION_TYPE_READ_WRITE ||
+					 conn->requested_session_type == SESSION_TYPE_PREFER_READ ||
+					 conn->requested_session_type == SESSION_TYPE_READ_ONLY))
 				{
 					if (conn->sversion < 120000)
 					{
@@ -3549,7 +3559,7 @@ keep_going:						/* We will come back to here until there is
 						 * connections as it couldn't find any servers that
 						 * are default read-only.
 						 */
-						if (conn->read_write_host_index == -2)
+						if (conn->read_write_or_primary_host_index == -2)
 							goto consume_checked_target_connection;
 
 						/* Append error report to conn->errorMessage. */
@@ -3580,8 +3590,8 @@ keep_going:						/* We will come back to here until there is
 
 						/* Record read-write host index */
 						if (conn->requested_session_type == SESSION_TYPE_PREFER_READ &&
-							conn->read_write_host_index == -1)
-							conn->read_write_host_index = conn->whichhost;
+							conn->read_write_or_primary_host_index == -1)
+							conn->read_write_or_primary_host_index = conn->whichhost;
 
 						/*
 						 * Try next host if any, but we don't want to consider
@@ -3596,30 +3606,70 @@ keep_going:						/* We will come back to here until there is
 				}
 
 				/*
-				 * Requested type is prefer-read, then record this host index
-				 * and try the other before considering it later. If requested
-				 * type of connection is read-only, ignore this connection.
+				 * severs before 9.0 don't support recovery, skip the check
+				 * when the requested type of connection is primary,
+				 * prefer-standby or standby.
 				 */
+				else if ((conn->sversion >= 90000 &&
+						  (conn->requested_session_type == SESSION_TYPE_PRIMARY ||
+						   conn->requested_session_type == SESSION_TYPE_PREFER_STANDBY ||
+						   conn->requested_session_type == SESSION_TYPE_STANDBY)))
+				{
+					/*
+					 * Save existing error messages across the PQsendQuery
+					 * attempt.  This is necessary because PQsendQuery is
+					 * going to reset conn->errorMessage, so we would lose
+					 * error messages related to previous hosts we have tried
+					 * and failed to connect to.
+					 */
+					if (!saveErrorMessage(conn, &savedMessage))
+						goto error_return;
+
+					conn->status = CONNECTION_OK;
+					if (!PQsendQuery(conn, "SELECT pg_is_in_recovery()"))
+					{
+						restoreErrorMessage(conn, &savedMessage);
+						goto error_return;
+					}
+
+					conn->status = CONNECTION_CHECK_RECOVERY;
+
+					restoreErrorMessage(conn, &savedMessage);
+					return PGRES_POLLING_READING;
+				}
+
+				/*
+				 * Requested type is prefer-read or prefer-standby, then
+				 * record this host index and try the other before considering
+				 * it later. If requested type of connection is read-only or
+				 * standby, ignore this connection.
+				 */
+
 				if (conn->requested_session_type == SESSION_TYPE_PREFER_READ ||
-					conn->requested_session_type == SESSION_TYPE_READ_ONLY)
+					conn->requested_session_type == SESSION_TYPE_READ_ONLY ||
+					conn->requested_session_type == SESSION_TYPE_PREFER_STANDBY ||
+					conn->requested_session_type == SESSION_TYPE_STANDBY)
 				{
 					/*
 					 * The following scenario is possible only for the
-					 * prefer-read mode for the next pass of the list of
-					 * connections as it couldn't find any servers that are
-					 * default read-only.
+					 * prefer-read or prefer-standby mode for the next pass of
+					 * the list of connections as it couldn't find any servers
+					 * that are default read-only or in recovery mode.
 					 */
-					if (conn->read_write_host_index == -2)
-						goto target_accept_connection;
+					if (conn->read_write_or_primary_host_index == -2)
+						goto consume_checked_target_connection;
 
 					/* Close connection politely. */
 					conn->status = CONNECTION_OK;
 					sendTerminateConn(conn);
 
 					/* Record read-write host index */
-					if (conn->requested_session_type == SESSION_TYPE_PREFER_READ &&
-						conn->read_write_host_index == -1)
-						conn->read_write_host_index = conn->whichhost;
+					if (conn->requested_session_type == SESSION_TYPE_PREFER_READ ||
+						conn->requested_session_type == SESSION_TYPE_PREFER_STANDBY)
+					{
+						if (conn->read_write_or_primary_host_index == -1)
+							conn->read_write_or_primary_host_index = conn->whichhost;
+					}
 
 					/*
 					 * Try next host if any, but we don't want to consider
@@ -3721,7 +3771,7 @@ keep_going:						/* We will come back to here until there is
 						 * connections as it couldn't find any servers that
 						 * are default read-only.
 						 */
-						if (conn->read_write_host_index == -2)
+						if (conn->read_write_or_primary_host_index == -2)
 							goto consume_checked_write_connection;
 
 						/* Not a requested type; fail this connection. */
@@ -3756,8 +3806,8 @@ keep_going:						/* We will come back to here until there is
 
 						/* Record read-write host index */
 						if (conn->requested_session_type == SESSION_TYPE_PREFER_READ &&
-							conn->read_write_host_index == -1)
-							conn->read_write_host_index = conn->whichhost;
+							conn->read_write_or_primary_host_index == -1)
+							conn->read_write_or_primary_host_index = conn->whichhost;
 
 						/*
 						 * Try next host if any, but we don't want to consider
@@ -3810,6 +3860,144 @@ keep_going:						/* We will come back to here until there is
 				goto keep_going;
 			}
 
+		case CONNECTION_CHECK_RECOVERY:
+			{
+				const char *displayed_host;
+				const char *displayed_port;
+
+				if (!saveErrorMessage(conn, &savedMessage))
+					goto error_return;
+
+				conn->status = CONNECTION_OK;
+				if (!PQconsumeInput(conn))
+				{
+					restoreErrorMessage(conn, &savedMessage);
+					goto error_return;
+				}
+
+				if (PQisBusy(conn))
+				{
+					conn->status = CONNECTION_CHECK_RECOVERY;
+					restoreErrorMessage(conn, &savedMessage);
+					return PGRES_POLLING_READING;
+				}
+
+				res = PQgetResult(conn);
+				if (res && (PQresultStatus(res) == PGRES_TUPLES_OK) &&
+					PQntuples(res) == 1)
+				{
+					char	   *val;
+					bool		standby_server;
+
+					val = PQgetvalue(res, 0, 0);
+					standby_server = (strncmp(val, "t", 1) == 0);
+
+					/*
+					 * Server is in recovery mode and requested mode is
+					 * primary, ignore it. Server is not in recovery mode and
+					 * requested mode is prefer-standby, record it for the
+					 * first time and try to consume in the next scan (it
+					 * means no standby server is found in the first scan).
+					 */
+					if ((standby_server &&
+						 conn->requested_session_type == SESSION_TYPE_PRIMARY) ||
+						(!standby_server &&
+						 (conn->requested_session_type == SESSION_TYPE_PREFER_STANDBY ||
+						  conn->requested_session_type == SESSION_TYPE_STANDBY)))
+					{
+
+						/*
+						 * The following scenario is possible only for the
+						 * prefer-standby mode for the next pass of the list
+						 * of connections as it couldn't find any servers that
+						 * are in recovery.
+						 */
+						if (conn->read_write_or_primary_host_index == -2)
+							goto consume_checked_recovery_connection;
+
+						/* Not a requested type; fail this connection. */
+						PQclear(res);
+						restoreErrorMessage(conn, &savedMessage);
+
+						/* Append error report to conn->errorMessage. */
+						if (conn->connhost[conn->whichhost].type == CHT_HOST_ADDRESS)
+							displayed_host = conn->connhost[conn->whichhost].hostaddr;
+						else
+							displayed_host = conn->connhost[conn->whichhost].host;
+						displayed_port = conn->connhost[conn->whichhost].port;
+						if (displayed_port == NULL || displayed_port[0] == '\0')
+							displayed_port = DEF_PGPORT_STR;
+
+						if (conn->requested_session_type == SESSION_TYPE_PRIMARY)
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext("server is in recovery mode "
+															"\"%s:%s\"\n"),
+											  displayed_host, displayed_port);
+						else
+							appendPQExpBuffer(&conn->errorMessage,
+											  libpq_gettext("server is not in recovery mode "
+															"\"%s:%s\"\n"),
+											  displayed_host, displayed_port);
+
+						/* Close connection politely. */
+						conn->status = CONNECTION_OK;
+						sendTerminateConn(conn);
+
+						/* Record primary host index */
+						if (conn->requested_session_type == SESSION_TYPE_PREFER_STANDBY &&
+							conn->read_write_or_primary_host_index == -1)
+							conn->read_write_or_primary_host_index = conn->whichhost;
+
+						/*
+						 * Try next host if any, but we don't want to consider
+						 * additional addresses for this host.
+						 */
+						conn->try_next_host = true;
+						goto keep_going;
+					}
+
+			consume_checked_recovery_connection:
+					/* Session is requested type, so we're good. */
+					PQclear(res);
+					termPQExpBuffer(&savedMessage);
+
+					/*
+					 * Finish reading any remaining messages before being
+					 * considered as ready.
+					 */
+					conn->status = CONNECTION_CONSUME;
+					goto keep_going;
+				}
+
+				/*
+				 * Something went wrong with "SELECT pg_is_in_recovery()". We
+				 * should try next addresses.
+				 */
+				if (res)
+					PQclear(res);
+				restoreErrorMessage(conn, &savedMessage);
+
+				/* Append error report to conn->errorMessage. */
+				if (conn->connhost[conn->whichhost].type == CHT_HOST_ADDRESS)
+					displayed_host = conn->connhost[conn->whichhost].hostaddr;
+				else
+					displayed_host = conn->connhost[conn->whichhost].host;
+				displayed_port = conn->connhost[conn->whichhost].port;
+				if (displayed_port == NULL || displayed_port[0] == '\0')
+					displayed_port = DEF_PGPORT_STR;
+				appendPQExpBuffer(&conn->errorMessage,
+								  libpq_gettext("test \"SELECT pg_is_in_recovery()\" failed "
+												"on server \"%s:%s\"\n"),
+								  displayed_host, displayed_port);
+
+				/* Close connection politely. */
+				conn->status = CONNECTION_OK;
+				sendTerminateConn(conn);
+
+				/* Try next address */
+				conn->try_next_addr = true;
+				goto keep_going;
+			}
 		default:
 			appendPQExpBuffer(&conn->errorMessage,
 							  libpq_gettext("invalid connection state %d, "
@@ -3955,7 +4143,7 @@ makeEmptyPGconn(void)
 #endif
 
 	conn->requested_session_type = SESSION_TYPE_ANY;
-	conn->read_write_host_index = -1;
+	conn->read_write_or_primary_host_index = -1;
 
 	/*
 	 * We try to send at least 8K at a time, which is the usual size of pipe
diff --git a/src/interfaces/libpq/libpq-fe.h b/src/interfaces/libpq/libpq-fe.h
index a015579c43..6c5780c672 100644
--- a/src/interfaces/libpq/libpq-fe.h
+++ b/src/interfaces/libpq/libpq-fe.h
@@ -68,7 +68,8 @@ typedef enum
 	CONNECTION_CONSUME,			/* Wait for any pending message and consume
 								 * them. */
 	CONNECTION_GSS_STARTUP,		/* Negotiating GSSAPI. */
-	CONNECTION_CHECK_TARGET		/* Check if we have a proper target connection */
+	CONNECTION_CHECK_TARGET,	/* Check if we have a proper target connection */
+	 CONNECTION_CHECK_RECOVERY	/* Check whether server is in recovery */
 } ConnStatusType;
 
 typedef enum
@@ -76,7 +77,10 @@ typedef enum
 	SESSION_TYPE_ANY = 0,		/* Any session (default) */
 	SESSION_TYPE_READ_WRITE,	/* Read-write session */
 	SESSION_TYPE_PREFER_READ,	/* Prefer read only session */
-	SESSION_TYPE_READ_ONLY		/* Read only session */
+	SESSION_TYPE_READ_ONLY,		/* Read only session */
+	SESSION_TYPE_PRIMARY,		/* Primary server */
+	SESSION_TYPE_PREFER_STANDBY,	/* Prefer Standby server */
+	SESSION_TYPE_STANDBY		/* Standby server */
 }			TargetSessionAttrsType;
 
 typedef enum
diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h
index a00d8bddeb..740033e116 100644
--- a/src/interfaces/libpq/libpq-int.h
+++ b/src/interfaces/libpq/libpq-int.h
@@ -366,7 +366,7 @@ struct pg_conn
 
 	/*
 	 * Type of connection to make.  Possible values: any, read-write,
-	 * prefer-read and read-only.
+	 * prefer-read, read-only, primary, prefer-standby and standby.
 	 */
 	char	   *target_session_attrs;
 	TargetSessionAttrsType requested_session_type;
@@ -410,7 +410,7 @@ struct pg_conn
 	 * Initial value is -1, then the index of the first read-write host, -2
 	 * during the second attempt of connection to avoid recursion.
 	 */
-	int			read_write_host_index;
+	int			read_write_or_primary_host_index;
 
 	/* Connection data */
 	pgsocket	sock;			/* FD for socket, PGINVALID_SOCKET if
diff --git a/src/test/recovery/t/001_stream_rep.pl b/src/test/recovery/t/001_stream_rep.pl
index ac1e11e1ab..8fa28dab23 100644
--- a/src/test/recovery/t/001_stream_rep.pl
+++ b/src/test/recovery/t/001_stream_rep.pl
@@ -3,7 +3,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 37;
+use Test::More tests => 41;
 
 # Initialize master node
 my $node_master = get_new_node('master');
@@ -141,6 +141,22 @@ test_target_session_attrs($node_master, $node_standby_1, $node_standby_1,
 test_target_session_attrs($node_standby_1, $node_master, $node_standby_1,
 	"read-only", 0);
 
+# Connect to master in "primary" mode with standby1,master list.
+test_target_session_attrs($node_standby_1, $node_master, $node_master,
+	"primary", 0);
+
+# Connect to master in "prefer-standby" mode with master,master list.
+test_target_session_attrs($node_master, $node_master, $node_master,
+	"prefer-standby", 0);
+
+# Connect to standby1 in "prefer-standby" mode with master,standby1 list.
+test_target_session_attrs($node_master, $node_standby_1, $node_standby_1,
+	"prefer-standby", 0);
+
+# Connect to standby1 in "standby" mode with master,standby1 list.
+test_target_session_attrs($node_master, $node_standby_1, $node_standby_1,
+	"standby", 0);
+
 # Test for SHOW commands using a WAL sender connection with a replication
 # role.
 note "testing SHOW commands for replication connection";
-- 
2.20.1.windows.1

