Hallo Andres,
Slight aside: Have you ever looked at moving pgbench to non-blocking
connection establishment? It seems weird to use non-blocking everywhere
but connection establishment.
Attached an attempt at doing that, mostly done for fun. It seems to be a
little slower on a local socket.
What do you think?
Maybe it would be worth having it with an option?
--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index b8864c6ae5..83ac2235a5 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -102,8 +102,9 @@ typedef struct socket_set
typedef struct socket_set
{
- int maxfd; /* largest FD currently set in fds */
- fd_set fds;
+ int maxfd; /* largest FD currently set in reads or writes */
+ fd_set reads;
+ fd_set writes;
} socket_set;
#endif /* POLL_USING_SELECT */
@@ -318,15 +319,32 @@ typedef enum
/*
* The client must first choose a script to execute. Once chosen, it can
* either be throttled (state CSTATE_PREPARE_THROTTLE under --rate), start
- * right away (state CSTATE_START_TX) or not start at all if the timer was
- * exceeded (state CSTATE_FINISHED).
+ * right away (state CSTATE_START_TX or CSTATE_CONNECT on --connect) or
+ * not start at all if the timer was exceeded (state CSTATE_FINISHED).
*/
CSTATE_CHOOSE_SCRIPT,
/*
- * CSTATE_START_TX performs start-of-transaction processing. Establishes
- * a new connection for the transaction in --connect mode, records the
- * transaction start time, and proceed to the first command.
+ * In CSTATE_PREPARE_THROTTLE state, we calculate when to begin the next
+ * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
+ * sleeps until that moment, then advances to CSTATE_CONNECT (-C) or
+ * CSTATE_START_TX (not -C), or CSTATE_FINISHED if the next transaction
+ * would start beyond the end of the run.
+ */
+ CSTATE_PREPARE_THROTTLE,
+ CSTATE_THROTTLE,
+
+ /*
+ * CSTATE_CONNECT Establishes a connection asynchronously before starting
+ * a transaction, under -C. The state is then CSTATE_CONNECTING till the
+ * connection is established, and then CSTATE_START_TX.
+ */
+ CSTATE_CONNECT,
+ CSTATE_CONNECTING,
+
+ /*
+ * CSTATE_START_TX performs start-of-transaction processing.
+ * It records the transaction start time, and proceed to the first command.
*
* Note: once a script is started, it will either error or run till its
* end, where it may be interrupted. It is not interrupted while running,
@@ -335,16 +353,6 @@ typedef enum
*/
CSTATE_START_TX,
- /*
- * In CSTATE_PREPARE_THROTTLE state, we calculate when to begin the next
- * transaction, and advance to CSTATE_THROTTLE. CSTATE_THROTTLE state
- * sleeps until that moment, then advances to CSTATE_START_TX, or
- * CSTATE_FINISHED if the next transaction would start beyond the end of
- * the run.
- */
- CSTATE_PREPARE_THROTTLE,
- CSTATE_THROTTLE,
-
/*
* We loop through these states, to process each command in the script:
*
@@ -401,6 +409,7 @@ typedef struct
int id; /* client No. */
ConnectionStateEnum state; /* state machine's current state. */
ConditionalStack cstack; /* enclosing conditionals state */
+ PostgresPollingStatusType poll_state; /* async connection status */
/*
* Separate randomness for each client. This is used for random functions
@@ -421,6 +430,7 @@ typedef struct
int64 sleep_until; /* scheduled start time of next cmd (usec) */
instr_time txn_begin; /* used for measuring schedule lag times */
instr_time stmt_begin; /* used for measuring statement latencies */
+ instr_time conn_begin; /* start of asynchronous connection under -C */
bool prepared[MAX_SCRIPTS]; /* whether client prepared the script */
@@ -607,9 +617,9 @@ static void setalarm(int seconds);
static socket_set *alloc_socket_set(int count);
static void free_socket_set(socket_set *sa);
static void clear_socket_set(socket_set *sa);
-static void add_socket_to_set(socket_set *sa, int fd, int idx);
+static void add_socket_to_set(socket_set *sa, int fd, int idx, bool reading);
static int wait_on_socket_set(socket_set *sa, int64 usecs);
-static bool socket_has_input(socket_set *sa, int fd, int idx);
+static bool socket_is_ready(socket_set *sa, int fd, int idx);
/* callback functions for our flex lexer */
@@ -1165,6 +1175,57 @@ tryExecuteStatement(PGconn *con, const char *sql)
PQclear(res);
}
+#define PARAMS_ARRAY_SIZE 7
+
+/* set connection parameters */
+static void
+setPQconnectionParams(const char *keywords[PARAMS_ARRAY_SIZE],
+ const char *values[PARAMS_ARRAY_SIZE],
+ const char *password)
+{
+ keywords[0] = "host";
+ values[0] = pghost;
+ keywords[1] = "port";
+ values[1] = pgport;
+ keywords[2] = "user";
+ values[2] = login;
+ keywords[3] = "password";
+ values[3] = password;
+ keywords[4] = "dbname";
+ values[4] = dbName;
+ keywords[5] = "fallback_application_name";
+ values[5] = progname;
+ keywords[6] = NULL;
+ values[6] = NULL;
+}
+
+/* start a connection to the backend, asynchronously */
+static PGconn *
+doAsyncConnect(void)
+{
+ const char *keywords[PARAMS_ARRAY_SIZE];
+ const char *values[PARAMS_ARRAY_SIZE];
+ PGconn *conn;
+
+ setPQconnectionParams(keywords, values, NULL);
+ conn = PQconnectStartParams(keywords, values, true);
+
+ if (!conn)
+ pg_log_error("async connection to database \"%s\" failed on start", dbName);
+ else if (PQstatus(conn) == CONNECTION_BAD)
+ {
+ if (PQconnectionNeedsPassword(conn))
+ pg_log_error("async connection to database \"%s\" expecting a password", dbName);
+ else
+ pg_log_error("async connection to database \"%s\" failed", dbName);
+
+ PQfinish(conn);
+ conn = NULL;
+ }
+
+ return conn;
+}
+
/* set up a connection to the backend */
static PGconn *
doConnect(void)
@@ -1180,26 +1241,10 @@ doConnect(void)
*/
do
{
-#define PARAMS_ARRAY_SIZE 7
-
const char *keywords[PARAMS_ARRAY_SIZE];
const char *values[PARAMS_ARRAY_SIZE];
- keywords[0] = "host";
- values[0] = pghost;
- keywords[1] = "port";
- values[1] = pgport;
- keywords[2] = "user";
- values[2] = login;
- keywords[3] = "password";
- values[3] = have_password ? password : NULL;
- keywords[4] = "dbname";
- values[4] = dbName;
- keywords[5] = "fallback_application_name";
- values[5] = progname;
- keywords[6] = NULL;
- values[6] = NULL;
-
+ setPQconnectionParams(keywords, values, have_password ? password : NULL);
new_pass = false;
conn = PQconnectdbParams(keywords, values, true);
@@ -2897,31 +2942,58 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
* a new transaction, or to get throttled if that's requested.
*/
st->state = timer_exceeded ? CSTATE_FINISHED :
- throttle_delay > 0 ? CSTATE_PREPARE_THROTTLE : CSTATE_START_TX;
+ throttle_delay > 0 ? CSTATE_PREPARE_THROTTLE :
+ is_connect ? CSTATE_CONNECT : CSTATE_START_TX;
break;
/* Start new transaction (script) */
- case CSTATE_START_TX:
+ case CSTATE_CONNECT:
- /* establish connection if needed, i.e. under --connect */
- if (st->con == NULL)
+ /* establish async connection under --connect */
+ Assert(st->con == NULL);
+
+ INSTR_TIME_SET_CURRENT_LAZY(now);
+ st->conn_begin = now;
+
+ if ((st->con = doAsyncConnect()) == NULL)
{
- instr_time start;
+ pg_log_error("client %d aborted while starting async connection", st->id);
+ st->state = CSTATE_ABORTED;
+ }
+ else
+ {
+ st->state = CSTATE_CONNECTING;
+ /* once PQconnectPoll is called, either _READING or _WRITING */
+ st->poll_state = PGRES_POLLING_OK;
+ }
+ break;
+
+ case CSTATE_CONNECTING:
- INSTR_TIME_SET_CURRENT_LAZY(now);
- start = now;
- if ((st->con = doConnect()) == NULL)
- {
- pg_log_error("client %d aborted while establishing connection", st->id);
- st->state = CSTATE_ABORTED;
- break;
- }
+ Assert(st->con != NULL);
+
+ st->poll_state = PQconnectPoll(st->con);
+
+ if (st->poll_state == PGRES_POLLING_FAILED)
+ {
+ pg_log_error("client %d aborted while establishing connection", st->id);
+ st->state = CSTATE_ABORTED;
+ }
+ else if (st->poll_state == PGRES_POLLING_OK)
+ {
+ /* record elapsed connection time */
INSTR_TIME_SET_CURRENT(now);
- INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
-
+ INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, st->conn_begin);
/* Reset session-local state */
memset(st->prepared, 0, sizeof(st->prepared));
+ st->state = CSTATE_START_TX;
}
+ /* else PGRES_POLLING_{READING,WRITING}, i.e. waiting for something */
+ break;
+
+ case CSTATE_START_TX:
+
+ Assert(st->con != NULL && PQstatus(st->con) == CONNECTION_OK);
/* record transaction start time */
INSTR_TIME_SET_CURRENT_LAZY(now);
@@ -3012,7 +3084,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
return; /* still sleeping, nothing to do here */
/* done sleeping, but don't start transaction if we're done */
- st->state = timer_exceeded ? CSTATE_FINISHED : CSTATE_START_TX;
+ st->state = timer_exceeded ? CSTATE_FINISHED :
+ is_connect ? CSTATE_CONNECT : CSTATE_START_TX;
break;
/*
@@ -6308,13 +6381,16 @@ threadRun(void *arg)
if (min_usec > this_usec)
min_usec = this_usec;
}
- else if (st->state == CSTATE_WAIT_RESULT)
+ else if (st->state == CSTATE_WAIT_RESULT ||
+ /* CONNECTING && POLLING_OK only occur on connection start */
+ (st->state == CSTATE_CONNECTING && st->poll_state != PGRES_POLLING_OK))
{
/*
* waiting for result from server - nothing to do unless the
* socket is readable
*/
int sock = PQsocket(st->con);
+ bool reading = st->state == CSTATE_WAIT_RESULT || st->poll_state == PGRES_POLLING_READING;
if (sock < 0)
{
@@ -6322,7 +6398,7 @@ threadRun(void *arg)
goto done;
}
- add_socket_to_set(sockets, sock, nsocks++);
+ add_socket_to_set(sockets, sock, nsocks++, reading);
}
else if (st->state != CSTATE_ABORTED &&
st->state != CSTATE_FINISHED)
@@ -6415,7 +6491,7 @@ threadRun(void *arg)
goto done;
}
- if (!socket_has_input(sockets, sock, nsocks++))
+ if (!socket_is_ready(sockets, sock, nsocks++))
continue;
}
else if (st->state == CSTATE_FINISHED ||
@@ -6569,9 +6645,9 @@ setalarm(int seconds)
* to expire. timeout is measured in microseconds; 0 means wait forever.
* Returns result code of underlying syscall (>=0 if OK, else see errno).
*
- * socket_has_input: after waiting, call this to see if given socket has
- * input. fd and idx parameters should match some previous call to
- * add_socket_to_set.
+ * socket_is_ready: after waiting, call this to see if given socket has
+ * input or is ready for writing. fd and idx parameters should match
+ * some previous call to add_socket_to_set.
*
* Note that wait_on_socket_set destructively modifies the state of the
* socket set. After checking for input, caller must apply clear_socket_set
@@ -6605,11 +6681,11 @@ clear_socket_set(socket_set *sa)
}
static void
-add_socket_to_set(socket_set *sa, int fd, int idx)
+add_socket_to_set(socket_set *sa, int fd, int idx, bool reading)
{
Assert(idx < sa->maxfds && idx == sa->curfds);
sa->pollfds[idx].fd = fd;
- sa->pollfds[idx].events = POLLIN;
+ sa->pollfds[idx].events = reading ? POLLIN : POLLOUT;
sa->pollfds[idx].revents = 0;
sa->curfds++;
}
@@ -6632,7 +6708,7 @@ wait_on_socket_set(socket_set *sa, int64 usecs)
}
static bool
-socket_has_input(socket_set *sa, int fd, int idx)
+socket_is_ready(socket_set *sa, int fd, int idx)
{
/*
* In some cases, threadRun will apply clear_socket_set and then try to
@@ -6646,7 +6722,7 @@ socket_has_input(socket_set *sa, int fd, int idx)
return false;
Assert(idx < sa->curfds && sa->pollfds[idx].fd == fd);
- return (sa->pollfds[idx].revents & POLLIN) != 0;
+ return (sa->pollfds[idx].revents & (POLLIN|POLLOUT)) != 0;
}
#endif /* POLL_USING_PPOLL */
@@ -6668,12 +6744,13 @@ free_socket_set(socket_set *sa)
static void
clear_socket_set(socket_set *sa)
{
- FD_ZERO(&sa->fds);
+ FD_ZERO(&sa->reads);
+ FD_ZERO(&sa->writes);
sa->maxfd = -1;
}
static void
-add_socket_to_set(socket_set *sa, int fd, int idx)
+add_socket_to_set(socket_set *sa, int fd, int idx, bool reading)
{
if (fd < 0 || fd >= FD_SETSIZE)
{
@@ -6684,7 +6761,7 @@ add_socket_to_set(socket_set *sa, int fd, int idx)
pg_log_fatal("too many client connections for select()");
exit(1);
}
- FD_SET(fd, &sa->fds);
+ FD_SET(fd, reading ? &sa->reads : &sa->writes);
if (fd > sa->maxfd)
sa->maxfd = fd;
}
@@ -6698,18 +6775,18 @@ wait_on_socket_set(socket_set *sa, int64 usecs)
timeout.tv_sec = usecs / 1000000;
timeout.tv_usec = usecs % 1000000;
- return select(sa->maxfd + 1, &sa->fds, NULL, NULL, &timeout);
+ return select(sa->maxfd + 1, &sa->reads, &sa->writes, NULL, &timeout);
}
else
{
- return select(sa->maxfd + 1, &sa->fds, NULL, NULL, NULL);
+ return select(sa->maxfd + 1, &sa->reads, &sa->writes, NULL, NULL);
}
}
static bool
-socket_has_input(socket_set *sa, int fd, int idx)
+socket_is_ready(socket_set *sa, int fd, int idx)
{
- return (FD_ISSET(fd, &sa->fds) != 0);
+ return FD_ISSET(fd, &sa->reads) != 0 || FD_ISSET(fd, &sa->writes) != 0;
}
#endif /* POLL_USING_SELECT */