Hallo Andres,

Slight aside: Have you ever looked at moving pgbench to non-blocking connection establishment? It seems weird to use non-blocking everywhere but connection establishment.

Attached an attempt at doing that, mostly done for fun. It seems to be a little slower on a local socket.

What do you think?

Maybe it would be worth having it with an option?

--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index b8864c6ae5..83ac2235a5 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -102,8 +102,9 @@ typedef struct socket_set
 
 typedef struct socket_set
 {
-	int			maxfd;			/* largest FD currently set in fds */
-	fd_set		fds;
+	int			maxfd;			/* largest FD currently set in reads or writes */
+	fd_set		reads;
+	fd_set		writes;
 } socket_set;
 
 #endif							/* POLL_USING_SELECT */
@@ -318,15 +319,32 @@ typedef enum
 	/*
 	 * The client must first choose a script to execute.  Once chosen, it can
 	 * either be throttled (state CSTATE_PREPARE_THROTTLE under --rate), start
-	 * right away (state CSTATE_START_TX) or not start at all if the timer was
-	 * exceeded (state CSTATE_FINISHED).
+	 * right away (state CSTATE_START_TX or CSTATE_CONNECT on --connect) or
+	 * not start at all if the timer was exceeded (state CSTATE_FINISHED).
 	 */
 	CSTATE_CHOOSE_SCRIPT,
 
 	/*
-	 * CSTATE_START_TX performs start-of-transaction processing.  Establishes
-	 * a new connection for the transaction in --connect mode, records the
-	 * transaction start time, and proceed to the first command.
+	 * In CSTATE_PREPARE_THROTTLE state, we calculate when to begin the next
+	 * transaction, and advance to CSTATE_THROTTLE.  CSTATE_THROTTLE state
+	 * sleeps until that moment, then advances to CSTATE_CONNECT (-C) or
+	 * CSTATE_START_TX (not -C), or CSTATE_FINISHED if the next transaction
+	 * would start beyond the end of the run.
+	 */
+	CSTATE_PREPARE_THROTTLE,
+	CSTATE_THROTTLE,
+
+	/*
+	 * CSTATE_CONNECT Establishes a connection asynchronously before starting
+	 * a transaction, under -C. The state is then CSTATE_CONNECTING till the
+	 * connection is established, and then CSTATE_START_TX.
+	 */
+	CSTATE_CONNECT,
+	CSTATE_CONNECTING,
+
+	/*
+	 * CSTATE_START_TX performs start-of-transaction processing.
+	 * It records the transaction start time, and proceed to the first command.
 	 *
 	 * Note: once a script is started, it will either error or run till its
 	 * end, where it may be interrupted. It is not interrupted while running,
@@ -335,16 +353,6 @@ typedef enum
 	 */
 	CSTATE_START_TX,
 
-	/*
-	 * In CSTATE_PREPARE_THROTTLE state, we calculate when to begin the next
-	 * transaction, and advance to CSTATE_THROTTLE.  CSTATE_THROTTLE state
-	 * sleeps until that moment, then advances to CSTATE_START_TX, or
-	 * CSTATE_FINISHED if the next transaction would start beyond the end of
-	 * the run.
-	 */
-	CSTATE_PREPARE_THROTTLE,
-	CSTATE_THROTTLE,
-
 	/*
 	 * We loop through these states, to process each command in the script:
 	 *
@@ -401,6 +409,7 @@ typedef struct
 	int			id;				/* client No. */
 	ConnectionStateEnum state;	/* state machine's current state. */
 	ConditionalStack cstack;	/* enclosing conditionals state */
+	PostgresPollingStatusType	poll_state; /* async connection status */
 
 	/*
 	 * Separate randomness for each client. This is used for random functions
@@ -421,6 +430,7 @@ typedef struct
 	int64		sleep_until;	/* scheduled start time of next cmd (usec) */
 	instr_time	txn_begin;		/* used for measuring schedule lag times */
 	instr_time	stmt_begin;		/* used for measuring statement latencies */
+	instr_time	conn_begin;		/* start of asynchronous connection under -C */
 
 	bool		prepared[MAX_SCRIPTS];	/* whether client prepared the script */
 
@@ -607,9 +617,9 @@ static void setalarm(int seconds);
 static socket_set *alloc_socket_set(int count);
 static void free_socket_set(socket_set *sa);
 static void clear_socket_set(socket_set *sa);
-static void add_socket_to_set(socket_set *sa, int fd, int idx);
+static void add_socket_to_set(socket_set *sa, int fd, int idx, bool reading);
 static int	wait_on_socket_set(socket_set *sa, int64 usecs);
-static bool socket_has_input(socket_set *sa, int fd, int idx);
+static bool socket_is_ready(socket_set *sa, int fd, int idx);
 
 
 /* callback functions for our flex lexer */
@@ -1165,6 +1175,57 @@ tryExecuteStatement(PGconn *con, const char *sql)
 	PQclear(res);
 }
 
+#define PARAMS_ARRAY_SIZE	7
+
+/* set connection parameters */
+static void
+setPQconnectionParams(const char *keywords[PARAMS_ARRAY_SIZE],
+					  const char *values[PARAMS_ARRAY_SIZE],
+					  const char *password)
+{
+	keywords[0] = "host";
+	values[0] = pghost;
+	keywords[1] = "port";
+	values[1] = pgport;
+	keywords[2] = "user";
+	values[2] = login;
+	keywords[3] = "password";
+	values[3] = password;
+	keywords[4] = "dbname";
+	values[4] = dbName;
+	keywords[5] = "fallback_application_name";
+	values[5] = progname;
+	keywords[6] = NULL;
+	values[6] = NULL;
+}
+
+/* start a connection to the backend, asynchronously */
+static PGconn *
+doAsyncConnect(void)
+{
+	const char *keywords[PARAMS_ARRAY_SIZE];
+	const char *values[PARAMS_ARRAY_SIZE];
+	PGconn	   *conn;
+
+	setPQconnectionParams(keywords, values, NULL);
+	conn = PQconnectStartParams(keywords, values, true);
+
+	if (!conn)
+		pg_log_error("async connection to database \"%s\" failed on start", dbName);
+	else if (PQstatus(conn) == CONNECTION_BAD)
+	{
+		if (PQconnectionNeedsPassword(conn))
+			pg_log_error("async connection to database \"%s\" expecting a password", dbName);
+		else
+			pg_log_error("async connection to database \"%s\" failed", dbName);
+
+		PQfinish(conn);
+		conn = NULL;
+	}
+
+	return conn;
+}
+
 /* set up a connection to the backend */
 static PGconn *
 doConnect(void)
@@ -1180,26 +1241,10 @@ doConnect(void)
 	 */
 	do
 	{
-#define PARAMS_ARRAY_SIZE	7
-
 		const char *keywords[PARAMS_ARRAY_SIZE];
 		const char *values[PARAMS_ARRAY_SIZE];
 
-		keywords[0] = "host";
-		values[0] = pghost;
-		keywords[1] = "port";
-		values[1] = pgport;
-		keywords[2] = "user";
-		values[2] = login;
-		keywords[3] = "password";
-		values[3] = have_password ? password : NULL;
-		keywords[4] = "dbname";
-		values[4] = dbName;
-		keywords[5] = "fallback_application_name";
-		values[5] = progname;
-		keywords[6] = NULL;
-		values[6] = NULL;
-
+		setPQconnectionParams(keywords, values, have_password ? password : NULL);
 		new_pass = false;
 
 		conn = PQconnectdbParams(keywords, values, true);
@@ -2897,31 +2942,58 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				 * a new transaction, or to get throttled if that's requested.
 				 */
 				st->state = timer_exceeded ? CSTATE_FINISHED :
-					throttle_delay > 0 ? CSTATE_PREPARE_THROTTLE : CSTATE_START_TX;
+					throttle_delay > 0 ? CSTATE_PREPARE_THROTTLE :
+						is_connect ? CSTATE_CONNECT : CSTATE_START_TX;
 				break;
 
 				/* Start new transaction (script) */
-			case CSTATE_START_TX:
+			case CSTATE_CONNECT:
 
-				/* establish connection if needed, i.e. under --connect */
-				if (st->con == NULL)
+				/* establish async connection under --connect */
+				Assert(st->con == NULL);
+
+				INSTR_TIME_SET_CURRENT_LAZY(now);
+				st->conn_begin = now;
+
+				if ((st->con = doAsyncConnect()) == NULL)
 				{
-					instr_time	start;
+					pg_log_error("client %d aborted while starting async connection", st->id);
+					st->state = CSTATE_ABORTED;
+				}
+				else
+				{
+					st->state = CSTATE_CONNECTING;
+					/* once PQconnectPoll is called, either _READING or _WRITING */
+					st->poll_state = PGRES_POLLING_OK;
+				}
+				break;
+
+			case CSTATE_CONNECTING:
 
-					INSTR_TIME_SET_CURRENT_LAZY(now);
-					start = now;
-					if ((st->con = doConnect()) == NULL)
-					{
-						pg_log_error("client %d aborted while establishing connection", st->id);
-						st->state = CSTATE_ABORTED;
-						break;
-					}
+				Assert(st->con != NULL);
+
+				st->poll_state = PQconnectPoll(st->con);
+
+				if (st->poll_state == PGRES_POLLING_FAILED)
+				{
+					pg_log_error("client %d aborted while establishing connection", st->id);
+					st->state = CSTATE_ABORTED;
+				}
+				else if (st->poll_state == PGRES_POLLING_OK)
+				{
+					/* record elapsed connection time */
 					INSTR_TIME_SET_CURRENT(now);
-					INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
-
+					INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, st->conn_begin);
 					/* Reset session-local state */
 					memset(st->prepared, 0, sizeof(st->prepared));
+					st->state = CSTATE_START_TX;
 				}
+				/* else PGRES_POLLING_{READING,WRITING}, i.e. waiting for something */
+				break;
+
+			case CSTATE_START_TX:
+
+				Assert(st->con != NULL && PQstatus(st->con) == CONNECTION_OK);
 
 				/* record transaction start time */
 				INSTR_TIME_SET_CURRENT_LAZY(now);
@@ -3012,7 +3084,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 					return;		/* still sleeping, nothing to do here */
 
 				/* done sleeping, but don't start transaction if we're done */
-				st->state = timer_exceeded ? CSTATE_FINISHED : CSTATE_START_TX;
+				st->state = timer_exceeded ? CSTATE_FINISHED :
+					is_connect ? CSTATE_CONNECT : CSTATE_START_TX;
 				break;
 
 				/*
@@ -6308,13 +6381,16 @@ threadRun(void *arg)
 				if (min_usec > this_usec)
 					min_usec = this_usec;
 			}
-			else if (st->state == CSTATE_WAIT_RESULT)
+			else if (st->state == CSTATE_WAIT_RESULT ||
+					 /* CONNECTING && POLLING_OK only occur on connection start */
+					 (st->state == CSTATE_CONNECTING && st->poll_state != PGRES_POLLING_OK))
 			{
 				/*
 				 * waiting for result from server - nothing to do unless the
 				 * socket is readable
 				 */
 				int			sock = PQsocket(st->con);
+				bool		reading = st->state == CSTATE_WAIT_RESULT || st->poll_state == PGRES_POLLING_READING;
 
 				if (sock < 0)
 				{
@@ -6322,7 +6398,7 @@ threadRun(void *arg)
 					goto done;
 				}
 
-				add_socket_to_set(sockets, sock, nsocks++);
+				add_socket_to_set(sockets, sock, nsocks++, reading);
 			}
 			else if (st->state != CSTATE_ABORTED &&
 					 st->state != CSTATE_FINISHED)
@@ -6415,7 +6491,7 @@ threadRun(void *arg)
 					goto done;
 				}
 
-				if (!socket_has_input(sockets, sock, nsocks++))
+				if (!socket_is_ready(sockets, sock, nsocks++))
 					continue;
 			}
 			else if (st->state == CSTATE_FINISHED ||
@@ -6569,9 +6645,9 @@ setalarm(int seconds)
  *		to expire.  timeout is measured in microseconds; 0 means wait forever.
  *		Returns result code of underlying syscall (>=0 if OK, else see errno).
  *
- * socket_has_input: after waiting, call this to see if given socket has
- *		input.  fd and idx parameters should match some previous call to
- *		add_socket_to_set.
+ * socket_is_ready: after waiting, call this to see if given socket has
+ *		input or is ready for writing.  fd and idx parameters should match
+ * 		some previous call to add_socket_to_set.
  *
  * Note that wait_on_socket_set destructively modifies the state of the
  * socket set.  After checking for input, caller must apply clear_socket_set
@@ -6605,11 +6681,11 @@ clear_socket_set(socket_set *sa)
 }
 
 static void
-add_socket_to_set(socket_set *sa, int fd, int idx)
+add_socket_to_set(socket_set *sa, int fd, int idx, bool reading)
 {
 	Assert(idx < sa->maxfds && idx == sa->curfds);
 	sa->pollfds[idx].fd = fd;
-	sa->pollfds[idx].events = POLLIN;
+	sa->pollfds[idx].events = reading ? POLLIN : POLLOUT;
 	sa->pollfds[idx].revents = 0;
 	sa->curfds++;
 }
@@ -6632,7 +6708,7 @@ wait_on_socket_set(socket_set *sa, int64 usecs)
 }
 
 static bool
-socket_has_input(socket_set *sa, int fd, int idx)
+socket_is_ready(socket_set *sa, int fd, int idx)
 {
 	/*
 	 * In some cases, threadRun will apply clear_socket_set and then try to
@@ -6646,7 +6722,7 @@ socket_has_input(socket_set *sa, int fd, int idx)
 		return false;
 
 	Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd);
-	return (sa->pollfds[idx].revents & POLLIN) != 0;
+	return (sa->pollfds[idx].revents & (POLLIN|POLLOUT)) != 0;
 }
 
 #endif							/* POLL_USING_PPOLL */
@@ -6668,12 +6744,13 @@ free_socket_set(socket_set *sa)
 static void
 clear_socket_set(socket_set *sa)
 {
-	FD_ZERO(&sa->fds);
+	FD_ZERO(&sa->reads);
+	FD_ZERO(&sa->writes);
 	sa->maxfd = -1;
 }
 
 static void
-add_socket_to_set(socket_set *sa, int fd, int idx)
+add_socket_to_set(socket_set *sa, int fd, int idx, bool reading)
 {
 	if (fd < 0 || fd >= FD_SETSIZE)
 	{
@@ -6684,7 +6761,7 @@ add_socket_to_set(socket_set *sa, int fd, int idx)
 		pg_log_fatal("too many client connections for select()");
 		exit(1);
 	}
-	FD_SET(fd, &sa->fds);
+	FD_SET(fd, reading ? &sa->reads : &sa->writes);
 	if (fd > sa->maxfd)
 		sa->maxfd = fd;
 }
@@ -6698,18 +6775,18 @@ wait_on_socket_set(socket_set *sa, int64 usecs)
 
 		timeout.tv_sec = usecs / 1000000;
 		timeout.tv_usec = usecs % 1000000;
-		return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout);
+		return select(sa->maxfd + 1, &sa->reads, &sa->writes, NULL, &timeout);
 	}
 	else
 	{
-		return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL);
+		return select(sa->maxfd + 1, &sa->reads, &sa->writes, NULL, NULL);
 	}
 }
 
 static bool
-socket_has_input(socket_set *sa, int fd, int idx)
+socket_is_ready(socket_set *sa, int fd, int idx)
 {
-	return (FD_ISSET(fd, &sa->fds) != 0);
+	return FD_ISSET(fd, &sa->reads) != 0 || FD_ISSET(fd, &sa->writes) != 0;
 }
 
 #endif							/* POLL_USING_SELECT */

Reply via email to