On Thu, Mar 4, 2021 at 10:44 PM Thomas Munro <thomas.mu...@gmail.com> wrote:
> v10-0002-pgbench-Refactor-the-way-we-do-thread-portabilit.patch

Here's a better version of that part.  I don't yet know if it actually
works on Windows...
From 3aa63dfc086ab1f687ed668091a6bda8bf270fa7 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Sat, 2 Jan 2021 15:05:06 +1300
Subject: [PATCH v11 1/6] Add missing pthread_barrier_t.

Supply a simple implementation of the missing pthread_barrier_t type and
functions, for macOS.

Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de
---
 configure                       | 69 +++++++++++++++++++++++++++++++++
 configure.ac                    |  2 +
 src/include/pg_config.h.in      |  3 ++
 src/include/port/pg_pthread.h   | 41 ++++++++++++++++++++
 src/port/pthread_barrier_wait.c | 66 +++++++++++++++++++++++++++++++
 src/tools/msvc/Solution.pm      |  1 +
 6 files changed, 182 insertions(+)
 create mode 100644 src/include/port/pg_pthread.h
 create mode 100644 src/port/pthread_barrier_wait.c

diff --git a/configure b/configure
index ce9ea36999..fad817bb38 100755
--- a/configure
+++ b/configure
@@ -11635,6 +11635,62 @@ if test "$ac_res" != no; then :
 
 fi
 
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing pthread_barrier_wait" >&5
+$as_echo_n "checking for library containing pthread_barrier_wait... " >&6; }
+if ${ac_cv_search_pthread_barrier_wait+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  ac_func_search_save_LIBS=$LIBS
+cat confdefs.h - <<_ACEOF >conftest.$ac_ext
+/* end confdefs.h.  */
+
+/* Override any GCC internal prototype to avoid an error.
+   Use char because int might match the return type of a GCC
+   builtin and then its argument prototype would still apply.  */
+#ifdef __cplusplus
+extern "C"
+#endif
+char pthread_barrier_wait ();
+int
+main ()
+{
+return pthread_barrier_wait ();
+  ;
+  return 0;
+}
+_ACEOF
+for ac_lib in '' pthread; do
+  if test -z "$ac_lib"; then
+    ac_res="none required"
+  else
+    ac_res=-l$ac_lib
+    LIBS="-l$ac_lib  $ac_func_search_save_LIBS"
+  fi
+  if ac_fn_c_try_link "$LINENO"; then :
+  ac_cv_search_pthread_barrier_wait=$ac_res
+fi
+rm -f core conftest.err conftest.$ac_objext \
+    conftest$ac_exeext
+  if ${ac_cv_search_pthread_barrier_wait+:} false; then :
+  break
+fi
+done
+if ${ac_cv_search_pthread_barrier_wait+:} false; then :
+
+else
+  ac_cv_search_pthread_barrier_wait=no
+fi
+rm conftest.$ac_ext
+LIBS=$ac_func_search_save_LIBS
+fi
+{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_search_pthread_barrier_wait" >&5
+$as_echo "$ac_cv_search_pthread_barrier_wait" >&6; }
+ac_res=$ac_cv_search_pthread_barrier_wait
+if test "$ac_res" != no; then :
+  test "$ac_res" = "none required" || LIBS="$ac_res $LIBS"
+
+fi
+
 # Solaris:
 { $as_echo "$as_me:${as_lineno-$LINENO}: checking for library containing fdatasync" >&5
 $as_echo_n "checking for library containing fdatasync... " >&6; }
@@ -15883,6 +15939,19 @@ esac
 
 fi
 
+ac_fn_c_check_func "$LINENO" "pthread_barrier_wait" "ac_cv_func_pthread_barrier_wait"
+if test "x$ac_cv_func_pthread_barrier_wait" = xyes; then :
+  $as_echo "#define HAVE_PTHREAD_BARRIER_WAIT 1" >>confdefs.h
+
+else
+  case " $LIBOBJS " in
+  *" pthread_barrier_wait.$ac_objext "* ) ;;
+  *) LIBOBJS="$LIBOBJS pthread_barrier_wait.$ac_objext"
+ ;;
+esac
+
+fi
+
 ac_fn_c_check_func "$LINENO" "pwrite" "ac_cv_func_pwrite"
 if test "x$ac_cv_func_pwrite" = xyes; then :
   $as_echo "#define HAVE_PWRITE 1" >>confdefs.h
diff --git a/configure.ac b/configure.ac
index f54f65febe..0ed53571dd 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1143,6 +1143,7 @@ AC_SEARCH_LIBS(getopt_long, [getopt gnugetopt])
 AC_SEARCH_LIBS(shm_open, rt)
 AC_SEARCH_LIBS(shm_unlink, rt)
 AC_SEARCH_LIBS(clock_gettime, [rt posix4])
+AC_SEARCH_LIBS(pthread_barrier_wait, pthread)
 # Solaris:
 AC_SEARCH_LIBS(fdatasync, [rt posix4])
 # Required for thread_test.c on Solaris
@@ -1743,6 +1744,7 @@ AC_REPLACE_FUNCS(m4_normalize([
 	mkdtemp
 	pread
 	preadv
+	pthread_barrier_wait
 	pwrite
 	pwritev
 	random
diff --git a/src/include/pg_config.h.in b/src/include/pg_config.h.in
index 04dc330119..7a7cc21d8d 100644
--- a/src/include/pg_config.h.in
+++ b/src/include/pg_config.h.in
@@ -424,6 +424,9 @@
 /* Define if you have POSIX threads libraries and header files. */
 #undef HAVE_PTHREAD
 
+/* Define to 1 if you have the `pthread_barrier_wait' function. */
+#undef HAVE_PTHREAD_BARRIER_WAIT
+
 /* Define to 1 if you have the `pthread_is_threaded_np' function. */
 #undef HAVE_PTHREAD_IS_THREADED_NP
 
diff --git a/src/include/port/pg_pthread.h b/src/include/port/pg_pthread.h
new file mode 100644
index 0000000000..5222cdce6e
--- /dev/null
+++ b/src/include/port/pg_pthread.h
@@ -0,0 +1,41 @@
+/*-------------------------------------------------------------------------
+ *
+ * Declarations for missing POSIX thread components.
+ *
+ *	  Currently this supplies an implementation of pthread_barrier_t for the
+ *	  benefit of macOS, which lacks it as of release 11.  These declarations
+ *	  are not in port.h, because that'd require <pthread.h> to be included by
+ *	  every translation unit.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef PG_PTHREAD_H
+#define PG_PTHREAD_H
+
+#include <pthread.h>
+
+#ifndef HAVE_PTHREAD_BARRIER_WAIT
+
+#ifndef PTHREAD_BARRIER_SERIAL_THREAD
+#define PTHREAD_BARRIER_SERIAL_THREAD (-1)
+#endif
+
+typedef struct pg_pthread_barrier
+{
+	bool		sense;			/* we only need a one bit phase */
+	int			count;			/* number of threads expected */
+	int			arrived;		/* number of threads that have arrived */
+	pthread_mutex_t mutex;
+	pthread_cond_t cond;
+} pthread_barrier_t;
+
+extern int pthread_barrier_init(pthread_barrier_t *barrier,
+								const void *attr,
+								int count);
+extern int pthread_barrier_wait(pthread_barrier_t *barrier);
+extern int pthread_barrier_destroy(pthread_barrier_t *barrier);
+
+#endif
+
+#endif
diff --git a/src/port/pthread_barrier_wait.c b/src/port/pthread_barrier_wait.c
new file mode 100644
index 0000000000..08dacc3085
--- /dev/null
+++ b/src/port/pthread_barrier_wait.c
@@ -0,0 +1,66 @@
+#include "postgres_fe.h"
+
+#include "port/pg_pthread.h"
+
+int
+pthread_barrier_init(pthread_barrier_t *barrier, const void *attr, int count)
+{
+	barrier->sense = false;
+	barrier->count = count;
+	barrier->arrived = 0;
+	if (pthread_cond_init(&barrier->cond, NULL) < 0)
+		return -1;
+	if (pthread_mutex_init(&barrier->mutex, NULL) < 0)
+	{
+		int save_errno = errno;
+
+		pthread_cond_destroy(&barrier->cond);
+		errno = save_errno;
+
+		return -1;
+	}
+
+	return 0;
+}
+
+int
+pthread_barrier_wait(pthread_barrier_t *barrier)
+{
+	bool		initial_sense;
+
+	pthread_mutex_lock(&barrier->mutex);
+
+	/* We have arrived at the barrier. */
+	barrier->arrived++;
+	Assert(barrier->arrived <= barrier->count);
+
+	/* If we were the last to arrive, release the others and return. */
+	if (barrier->arrived == barrier->count)
+	{
+		barrier->arrived = 0;
+		barrier->sense = !barrier->sense;
+		pthread_mutex_unlock(&barrier->mutex);
+		pthread_cond_broadcast(&barrier->cond);
+
+		return PTHREAD_BARRIER_SERIAL_THREAD;
+	}
+
+	/* Wait for someone else to flip the sense. */
+	initial_sense = barrier->sense;
+	do
+	{
+		pthread_cond_wait(&barrier->cond, &barrier->mutex);
+	} while (barrier->sense == initial_sense);
+
+	pthread_mutex_unlock(&barrier->mutex);
+
+	return 0;
+}
+
+int
+pthread_barrier_destroy(pthread_barrier_t *barrier)
+{
+	pthread_cond_destroy(&barrier->cond);
+	pthread_mutex_destroy(&barrier->mutex);
+	return 0;
+}
diff --git a/src/tools/msvc/Solution.pm b/src/tools/msvc/Solution.pm
index 69b08591cc..a4f5cc4bdb 100644
--- a/src/tools/msvc/Solution.pm
+++ b/src/tools/msvc/Solution.pm
@@ -333,6 +333,7 @@ sub GenerateFiles
 		HAVE_PSTAT                  => undef,
 		HAVE_PS_STRINGS             => undef,
 		HAVE_PTHREAD                => undef,
+		HAVE_PTHREAD_BARRIER_WAIT   => undef,
 		HAVE_PTHREAD_IS_THREADED_NP => undef,
 		HAVE_PTHREAD_PRIO_INHERIT   => undef,
 		HAVE_PWRITE                 => undef,
-- 
2.30.0

From 3b7941869aa3f0960aec9c367942ed13e66c071c Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Thu, 4 Mar 2021 00:42:13 +1300
Subject: [PATCH v11 2/6] pgbench: Refactor the way we do thread portability.

Instead of maintaining an incomplete emulation of POSIX threads for
Windows, let's use an extremely minimalist abstraction over both APIs
for now.  Small problems fixed: it's not OK to use (pthread_t) 0
as a special value, it's no OK to compare thread_t values with ==, and
we incorrectly assumed that pthread functions set errno.

Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de
---
 src/bin/pgbench/pgbench.c | 122 +++++++++-----------------------------
 1 file changed, 29 insertions(+), 93 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 31a4df45f5..44674d9179 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -110,22 +110,36 @@ typedef struct socket_set
 #endif							/* POLL_USING_SELECT */
 
 /*
- * Multi-platform pthread implementations
+ * Multi-platform thread implementations
  */
 
 #ifdef WIN32
 /* Use native win32 threads on Windows */
-typedef struct win32_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int	pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
-static int	pthread_join(pthread_t th, void **thread_return);
+#include <windows.h>
+#define GETERRNO() (_dosmaperr(GetLastError()), errno)
+#define THREAD_T HANDLE
+#define THREAD_FUNC_RETURN_TYPE unsigned
+#define THREAD_FUNC_RETURN return 0
+#define THREAD_CREATE(handle, function, arg) \
+	((*(handle) = (HANDLE) _beginthreadex(NULL, 0, (function), (arg), 0, NULL)) == 0 ? errno : 0)
+#define THREAD_JOIN(handle) \
+	(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
+	GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
 #elif defined(ENABLE_THREAD_SAFETY)
 /* Use platform-dependent pthread capability */
 #include <pthread.h>
+#define THREAD_T pthread_t
+#define THREAD_FUNC_RETURN_TYPE void *
+#define THREAD_FUNC_RETURN return NULL
+#define THREAD_CREATE(handle, function, arg) \
+	pthread_create((handle), NULL, (function), (arg))
+#define THREAD_JOIN(handle) \
+	pthread_join((handle), NULL)
 #else
 /* No threads implementation, use none (-j 1) */
-#define pthread_t void *
+#define THREAD_T void *
+#define THREAD_FUNC_RETURN_TYPE void *
+#define THREAD_FUNC_RETURN return NULL
 #endif
 
 
@@ -435,7 +449,7 @@ typedef struct
 typedef struct
 {
 	int			tid;			/* thread id */
-	pthread_t	thread;			/* thread handle */
+	THREAD_T	thread;			/* thread handle */
 	CState	   *state;			/* array of CState */
 	int			nstate;			/* length of state[] */
 
@@ -458,8 +472,6 @@ typedef struct
 	int64		latency_late;	/* executed but late transactions */
 } TState;
 
-#define INVALID_THREAD		((pthread_t) 0)
-
 /*
  * queries read from files
  */
@@ -603,7 +615,7 @@ static void doLog(TState *thread, CState *st,
 static void processXactStats(TState *thread, CState *st, instr_time *now,
 							 bool skipped, StatsData *agg);
 static void addScript(ParsedScript script);
-static void *threadRun(void *arg);
+static THREAD_FUNC_RETURN_TYPE threadRun(void *arg);
 static void finishCon(CState *st);
 static void setalarm(int seconds);
 static socket_set *alloc_socket_set(int count);
@@ -6148,18 +6160,14 @@ main(int argc, char **argv)
 		/* the first thread (i = 0) is executed by main thread */
 		if (i > 0)
 		{
-			int			err = pthread_create(&thread->thread, NULL, threadRun, thread);
+			errno = THREAD_CREATE(&thread->thread, threadRun, thread);
 
-			if (err != 0 || thread->thread == INVALID_THREAD)
+			if (errno != 0)
 			{
 				pg_log_fatal("could not create thread: %m");
 				exit(1);
 			}
 		}
-		else
-		{
-			thread->thread = INVALID_THREAD;
-		}
 	}
 #else
 	INSTR_TIME_SET_CURRENT(threads[0].start_time);
@@ -6167,7 +6175,6 @@ main(int argc, char **argv)
 	if (duration > 0)
 		end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
 			(int64) 1000000 * duration;
-	threads[0].thread = INVALID_THREAD;
 #endif							/* ENABLE_THREAD_SAFETY */
 
 	/* wait for threads and accumulate results */
@@ -6178,12 +6185,12 @@ main(int argc, char **argv)
 		TState	   *thread = &threads[i];
 
 #ifdef ENABLE_THREAD_SAFETY
-		if (threads[i].thread == INVALID_THREAD)
+		if (i == 0)
 			/* actually run this thread directly in the main thread */
 			(void) threadRun(thread);
 		else
 			/* wait of other threads. should check that 0 is returned? */
-			pthread_join(thread->thread, NULL);
+			THREAD_JOIN(thread->thread);
 #else
 		(void) threadRun(thread);
 #endif							/* ENABLE_THREAD_SAFETY */
@@ -6222,7 +6229,7 @@ main(int argc, char **argv)
 	return exit_code;
 }
 
-static void *
+static THREAD_FUNC_RETURN_TYPE
 threadRun(void *arg)
 {
 	TState	   *thread = (TState *) arg;
@@ -6507,7 +6514,7 @@ done:
 		thread->logfile = NULL;
 	}
 	free_socket_set(sockets);
-	return NULL;
+	THREAD_FUNC_RETURN;
 }
 
 static void
@@ -6738,74 +6745,3 @@ socket_has_input(socket_set *sa, int fd, int idx)
 }
 
 #endif							/* POLL_USING_SELECT */
-
-
-/* partial pthread implementation for Windows */
-
-#ifdef WIN32
-
-typedef struct win32_pthread
-{
-	HANDLE		handle;
-	void	   *(*routine) (void *);
-	void	   *arg;
-	void	   *result;
-} win32_pthread;
-
-static unsigned __stdcall
-win32_pthread_run(void *arg)
-{
-	win32_pthread *th = (win32_pthread *) arg;
-
-	th->result = th->routine(th->arg);
-
-	return 0;
-}
-
-static int
-pthread_create(pthread_t *thread,
-			   pthread_attr_t *attr,
-			   void *(*start_routine) (void *),
-			   void *arg)
-{
-	int			save_errno;
-	win32_pthread *th;
-
-	th = (win32_pthread *) pg_malloc(sizeof(win32_pthread));
-	th->routine = start_routine;
-	th->arg = arg;
-	th->result = NULL;
-
-	th->handle = (HANDLE) _beginthreadex(NULL, 0, win32_pthread_run, th, 0, NULL);
-	if (th->handle == NULL)
-	{
-		save_errno = errno;
-		free(th);
-		return save_errno;
-	}
-
-	*thread = th;
-	return 0;
-}
-
-static int
-pthread_join(pthread_t th, void **thread_return)
-{
-	if (th == NULL || th->handle == NULL)
-		return errno = EINVAL;
-
-	if (WaitForSingleObject(th->handle, INFINITE) != WAIT_OBJECT_0)
-	{
-		_dosmaperr(GetLastError());
-		return errno;
-	}
-
-	if (thread_return)
-		*thread_return = th->result;
-
-	CloseHandle(th->handle);
-	free(th);
-	return 0;
-}
-
-#endif							/* WIN32 */
-- 
2.30.0

From a157bc61997db0333e49ed9b6dcd42d6cb55169f Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Thu, 4 Mar 2021 17:25:06 +1300
Subject: [PATCH v11 3/6] pgbench: Improve time measurement code.

XXX This needs a commit message.

Author: Fabien COELHO <coe...@cri.ensmp.fr>
Reviewed-by: Kyotaro Horiguchi <horikyota....@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de
---
 doc/src/sgml/ref/pgbench.sgml        |  39 +--
 src/bin/pgbench/pgbench.c            | 410 ++++++++++++---------------
 src/include/portability/instr_time.h |  28 ++
 3 files changed, 230 insertions(+), 247 deletions(-)

diff --git a/doc/src/sgml/ref/pgbench.sgml b/doc/src/sgml/ref/pgbench.sgml
index 2ec0580a79..3c3699cd73 100644
--- a/doc/src/sgml/ref/pgbench.sgml
+++ b/doc/src/sgml/ref/pgbench.sgml
@@ -58,8 +58,10 @@ number of clients: 10
 number of threads: 1
 number of transactions per client: 1000
 number of transactions actually processed: 10000/10000
-tps = 85.184871 (including connections establishing)
-tps = 85.296346 (excluding connections establishing)
+latency average = 11.013 ms
+latency stddev = 7.351 ms
+initial connection time = 45.758 ms
+tps = 896.967014 (without initial connection establishing)
 </screen>
 
   The first six lines report some of the most important parameter
@@ -68,8 +70,7 @@ tps = 85.296346 (excluding connections establishing)
   and number of transactions per client); these will be equal unless the run
   failed before completion.  (In <option>-T</option> mode, only the actual
   number of transactions is printed.)
-  The last two lines report the number of transactions per second,
-  figured with and without counting the time to start database sessions.
+  The last line reports the number of transactions per second.
  </para>
 
   <para>
@@ -2257,22 +2258,22 @@ number of clients: 10
 number of threads: 1
 number of transactions per client: 1000
 number of transactions actually processed: 10000/10000
-latency average = 15.844 ms
-latency stddev = 2.715 ms
-tps = 618.764555 (including connections establishing)
-tps = 622.977698 (excluding connections establishing)
+latency average = 10.870 ms
+latency stddev = 7.341 ms
+initial connection time = 30.954 ms
+tps = 907.949122 (without initial connection establishing)
 statement latencies in milliseconds:
-        0.002  \set aid random(1, 100000 * :scale)
-        0.005  \set bid random(1, 1 * :scale)
-        0.002  \set tid random(1, 10 * :scale)
-        0.001  \set delta random(-5000, 5000)
-        0.326  BEGIN;
-        0.603  UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
-        0.454  SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
-        5.528  UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
-        7.335  UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
-        0.371  INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
-        1.212  END;
+    0.001  \set aid random(1, 100000 * :scale)
+    0.001  \set bid random(1, 1 * :scale)
+    0.001  \set tid random(1, 10 * :scale)
+    0.000  \set delta random(-5000, 5000)
+    0.046  BEGIN;
+    0.151  UPDATE pgbench_accounts SET abalance = abalance + :delta WHERE aid = :aid;
+    0.107  SELECT abalance FROM pgbench_accounts WHERE aid = :aid;
+    4.241  UPDATE pgbench_tellers SET tbalance = tbalance + :delta WHERE tid = :tid;
+    5.245  UPDATE pgbench_branches SET bbalance = bbalance + :delta WHERE bid = :bid;
+    0.102  INSERT INTO pgbench_history (tid, bid, aid, delta, mtime) VALUES (:tid, :bid, :aid, :delta, CURRENT_TIMESTAMP);
+    0.974  END;
 </screen>
   </para>
 
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 44674d9179..40f8ae02a1 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -306,9 +306,9 @@ typedef struct SimpleStats
  */
 typedef struct StatsData
 {
-	time_t		start_time;		/* interval start time, for aggregates */
-	int64		cnt;			/* number of transactions, including skipped */
-	int64		skipped;		/* number of transactions skipped under --rate
+	pg_time_usec_t	start_time;	/* interval start time, for aggregates */
+	int64			cnt;		/* number of transactions, including skipped */
+	int64			skipped;	/* number of transactions skipped under --rate
 								 * and --latency-limit */
 	SimpleStats latency;
 	SimpleStats lag;
@@ -431,11 +431,11 @@ typedef struct
 	int			nvariables;		/* number of variables */
 	bool		vars_sorted;	/* are variables sorted by name? */
 
-	/* various times about current transaction */
-	int64		txn_scheduled;	/* scheduled start time of transaction (usec) */
-	int64		sleep_until;	/* scheduled start time of next cmd (usec) */
-	instr_time	txn_begin;		/* used for measuring schedule lag times */
-	instr_time	stmt_begin;		/* used for measuring statement latencies */
+	/* various times about current transaction in microseconds */
+	pg_time_usec_t	txn_scheduled;	/* scheduled start time of transaction */
+	pg_time_usec_t	sleep_until;	/* scheduled start time of next cmd */
+	pg_time_usec_t	txn_begin;		/* used for measuring schedule lag times */
+	pg_time_usec_t	stmt_begin;		/* used for measuring statement latencies */
 
 	bool		prepared[MAX_SCRIPTS];	/* whether client prepared the script */
 
@@ -463,13 +463,16 @@ typedef struct
 	RandomState ts_sample_rs;	/* random state for log sampling */
 
 	int64		throttle_trigger;	/* previous/next throttling (us) */
-	FILE	   *logfile;		/* where to log, or NULL */
+	FILE	   *logfile;			/* where to log, or NULL */
+
+	/* per thread collected stats in microseconds */
+	pg_time_usec_t	create_time;	/* thread creation time */
+	pg_time_usec_t	started_time;	/* thread is running */
+	pg_time_usec_t	bench_start; 	/* thread is benchmarking */
+	pg_time_usec_t	conn_duration;	/* cumulated connection and deconnection delays */
 
-	/* per thread collected stats */
-	instr_time	start_time;		/* thread start time */
-	instr_time	conn_time;
 	StatsData	stats;
-	int64		latency_late;	/* executed but late transactions */
+	int64		latency_late;	/* count executed but late transactions */
 } TState;
 
 /*
@@ -609,10 +612,10 @@ static void setIntValue(PgBenchValue *pv, int64 ival);
 static void setDoubleValue(PgBenchValue *pv, double dval);
 static bool evaluateExpr(CState *st, PgBenchExpr *expr,
 						 PgBenchValue *retval);
-static ConnectionStateEnum executeMetaCommand(CState *st, instr_time *now);
+static ConnectionStateEnum executeMetaCommand(CState *st, pg_time_usec_t *now);
 static void doLog(TState *thread, CState *st,
 				  StatsData *agg, bool skipped, double latency, double lag);
-static void processXactStats(TState *thread, CState *st, instr_time *now,
+static void processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 							 bool skipped, StatsData *agg);
 static void addScript(ParsedScript script);
 static THREAD_FUNC_RETURN_TYPE threadRun(void *arg);
@@ -1116,9 +1119,9 @@ mergeSimpleStats(SimpleStats *acc, SimpleStats *ss)
  * the given value.
  */
 static void
-initStats(StatsData *sd, time_t start_time)
+initStats(StatsData *sd, pg_time_usec_t start)
 {
-	sd->start_time = start_time;
+	sd->start_time = start;
 	sd->cnt = 0;
 	sd->skipped = 0;
 	initSimpleStats(&sd->latency);
@@ -2897,7 +2900,6 @@ evaluateSleep(CState *st, int argc, char **argv, int *usecs)
 static void
 advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 {
-	instr_time	now;
 
 	/*
 	 * gettimeofday() isn't free, so we get the current timestamp lazily the
@@ -2907,7 +2909,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 	 * means "not set yet".  Reset "now" when we execute shell commands or
 	 * expressions, which might take a non-negligible amount of time, though.
 	 */
-	INSTR_TIME_SET_ZERO(now);
+	pg_time_usec_t	now = 0;
 
 	/*
 	 * Loop in the state machine, until we have to wait for a result from the
@@ -2942,29 +2944,30 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 
 				/* Start new transaction (script) */
 			case CSTATE_START_TX:
+				pg_time_now_lazy(&now);
 
 				/* establish connection if needed, i.e. under --connect */
 				if (st->con == NULL)
 				{
-					instr_time	start;
+					pg_time_usec_t	start = now;
 
-					INSTR_TIME_SET_CURRENT_LAZY(now);
-					start = now;
 					if ((st->con = doConnect()) == NULL)
 					{
 						pg_log_error("client %d aborted while establishing connection", st->id);
 						st->state = CSTATE_ABORTED;
 						break;
 					}
-					INSTR_TIME_SET_CURRENT(now);
-					INSTR_TIME_ACCUM_DIFF(thread->conn_time, now, start);
+
+					/* reset now after connection */
+					now = pg_time_now();
+
+					thread->conn_duration += now - start;
 
 					/* Reset session-local state */
 					memset(st->prepared, 0, sizeof(st->prepared));
 				}
 
 				/* record transaction start time */
-				INSTR_TIME_SET_CURRENT_LAZY(now);
 				st->txn_begin = now;
 
 				/*
@@ -2972,7 +2975,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				 * scheduled start time.
 				 */
 				if (!throttle_delay)
-					st->txn_scheduled = INSTR_TIME_GET_MICROSEC(now);
+					st->txn_scheduled = now;
 
 				/* Begin with the first command */
 				st->state = CSTATE_START_COMMAND;
@@ -3008,12 +3011,9 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				 */
 				if (latency_limit)
 				{
-					int64		now_us;
+					pg_time_now_lazy(&now);
 
-					INSTR_TIME_SET_CURRENT_LAZY(now);
-					now_us = INSTR_TIME_GET_MICROSEC(now);
-
-					while (thread->throttle_trigger < now_us - latency_limit &&
+					while (thread->throttle_trigger < now - latency_limit &&
 						   (nxacts <= 0 || st->cnt < nxacts))
 					{
 						processXactStats(thread, st, &now, true, agg);
@@ -3046,9 +3046,9 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				 * Wait until it's time to start next transaction.
 				 */
 			case CSTATE_THROTTLE:
-				INSTR_TIME_SET_CURRENT_LAZY(now);
+				pg_time_now_lazy(&now);
 
-				if (INSTR_TIME_GET_MICROSEC(now) < st->txn_scheduled)
+				if (now < st->txn_scheduled)
 					return;		/* still sleeping, nothing to do here */
 
 				/* done sleeping, but don't start transaction if we're done */
@@ -3071,7 +3071,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				/* record begin time of next command, and initiate it */
 				if (report_per_command)
 				{
-					INSTR_TIME_SET_CURRENT_LAZY(now);
+					pg_time_now_lazy(&now);
 					st->stmt_begin = now;
 				}
 
@@ -3236,8 +3236,8 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				 * instead of CSTATE_START_TX.
 				 */
 			case CSTATE_SLEEP:
-				INSTR_TIME_SET_CURRENT_LAZY(now);
-				if (INSTR_TIME_GET_MICROSEC(now) < st->sleep_until)
+				pg_time_now_lazy(&now);
+				if (now < st->sleep_until)
 					return;		/* still sleeping, nothing to do here */
 				/* Else done sleeping. */
 				st->state = CSTATE_END_COMMAND;
@@ -3257,13 +3257,12 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				{
 					Command    *command;
 
-					INSTR_TIME_SET_CURRENT_LAZY(now);
+					pg_time_now_lazy(&now);
 
 					command = sql_script[st->use_file].commands[st->command];
 					/* XXX could use a mutex here, but we choose not to */
 					addToSimpleStats(&command->stats,
-									 INSTR_TIME_GET_DOUBLE(now) -
-									 INSTR_TIME_GET_DOUBLE(st->stmt_begin));
+									 PG_TIME_GET_DOUBLE(now - st->stmt_begin));
 				}
 
 				/* Go ahead with next command, to be executed or skipped */
@@ -3289,7 +3288,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
 				if (is_connect)
 				{
 					finishCon(st);
-					INSTR_TIME_SET_ZERO(now);
+					now = 0;
 				}
 
 				if ((st->cnt >= nxacts && duration <= 0) || timer_exceeded)
@@ -3327,7 +3326,7 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
  * take no time to execute.
  */
 static ConnectionStateEnum
-executeMetaCommand(CState *st, instr_time *now)
+executeMetaCommand(CState *st, pg_time_usec_t *now)
 {
 	Command    *command = sql_script[st->use_file].commands[st->command];
 	int			argc;
@@ -3369,8 +3368,8 @@ executeMetaCommand(CState *st, instr_time *now)
 			return CSTATE_ABORTED;
 		}
 
-		INSTR_TIME_SET_CURRENT_LAZY(*now);
-		st->sleep_until = INSTR_TIME_GET_MICROSEC(*now) + usec;
+		pg_time_now_lazy(now);
+		st->sleep_until = (*now) + usec;
 		return CSTATE_SLEEP;
 	}
 	else if (command->meta == META_SET)
@@ -3473,7 +3472,7 @@ executeMetaCommand(CState *st, instr_time *now)
 	 * executing the expression or shell command might have taken a
 	 * non-negligible amount of time, so reset 'now'
 	 */
-	INSTR_TIME_SET_ZERO(*now);
+	*now = 0;
 
 	return CSTATE_END_COMMAND;
 }
@@ -3483,14 +3482,15 @@ executeMetaCommand(CState *st, instr_time *now)
  *
  * We print Unix-epoch timestamps in the log, so that entries can be
  * correlated against other logs.  On some platforms this could be obtained
- * from the instr_time reading the caller has, but rather than get entangled
- * with that, we just eat the cost of an extra syscall in all cases.
+ * from the caller, but rather than get entangled with that, we just eat
+ * the cost of an extra syscall in all cases.
  */
 static void
 doLog(TState *thread, CState *st,
 	  StatsData *agg, bool skipped, double latency, double lag)
 {
 	FILE	   *logfile = thread->logfile;
+	pg_time_usec_t	now = pg_time_now();
 
 	Assert(use_log);
 
@@ -3510,13 +3510,12 @@ doLog(TState *thread, CState *st,
 		 * any empty intervals in between (this may happen with very low tps,
 		 * e.g. --rate=0.1).
 		 */
-		time_t		now = time(NULL);
 
 		while (agg->start_time + agg_interval <= now)
 		{
 			/* print aggregated report to logfile */
-			fprintf(logfile, "%ld " INT64_FORMAT " %.0f %.0f %.0f %.0f",
-					(long) agg->start_time,
+			fprintf(logfile, INT64_FORMAT " " INT64_FORMAT " %.0f %.0f %.0f %.0f",
+					agg->start_time,
 					agg->cnt,
 					agg->latency.sum,
 					agg->latency.sum2,
@@ -3544,17 +3543,13 @@ doLog(TState *thread, CState *st,
 	else
 	{
 		/* no, print raw transactions */
-		struct timeval tv;
-
-		gettimeofday(&tv, NULL);
 		if (skipped)
 			fprintf(logfile, "%d " INT64_FORMAT " skipped %d %ld %ld",
-					st->id, st->cnt, st->use_file,
-					(long) tv.tv_sec, (long) tv.tv_usec);
+					st->id, st->cnt, st->use_file, now / 1000000, now % 1000000);
 		else
 			fprintf(logfile, "%d " INT64_FORMAT " %.0f %d %ld %ld",
 					st->id, st->cnt, latency, st->use_file,
-					(long) tv.tv_sec, (long) tv.tv_usec);
+					now / 1000000, now % 1000000);
 		if (throttle_delay)
 			fprintf(logfile, " %.0f", lag);
 		fputc('\n', logfile);
@@ -3568,7 +3563,7 @@ doLog(TState *thread, CState *st,
  * Note that even skipped transactions are counted in the "cnt" fields.)
  */
 static void
-processXactStats(TState *thread, CState *st, instr_time *now,
+processXactStats(TState *thread, CState *st, pg_time_usec_t *now,
 				 bool skipped, StatsData *agg)
 {
 	double		latency = 0.0,
@@ -3578,11 +3573,11 @@ processXactStats(TState *thread, CState *st, instr_time *now,
 
 	if (detailed && !skipped)
 	{
-		INSTR_TIME_SET_CURRENT_LAZY(*now);
+		pg_time_now_lazy(now);
 
 		/* compute latency & lag */
-		latency = INSTR_TIME_GET_MICROSEC(*now) - st->txn_scheduled;
-		lag = INSTR_TIME_GET_MICROSEC(st->txn_begin) - st->txn_scheduled;
+		latency = (*now) - st->txn_scheduled;
+		lag = st->txn_begin - st->txn_scheduled;
 	}
 
 	if (thread_details)
@@ -3833,10 +3828,7 @@ initGenerateDataClientSide(PGconn *con)
 	int64		k;
 
 	/* used to track elapsed time and estimate of the remaining time */
-	instr_time	start,
-				diff;
-	double		elapsed_sec,
-				remaining_sec;
+	pg_time_usec_t	start;
 	int			log_interval = 1;
 
 	/* Stay on the same line if reporting to a terminal */
@@ -3888,7 +3880,7 @@ initGenerateDataClientSide(PGconn *con)
 	}
 	PQclear(res);
 
-	INSTR_TIME_SET_CURRENT(start);
+	start = pg_time_now();
 
 	for (k = 0; k < (int64) naccounts * scale; k++)
 	{
@@ -3913,11 +3905,8 @@ initGenerateDataClientSide(PGconn *con)
 		 */
 		if ((!use_quiet) && (j % 100000 == 0))
 		{
-			INSTR_TIME_SET_CURRENT(diff);
-			INSTR_TIME_SUBTRACT(diff, start);
-
-			elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
-			remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+			double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
+			double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
 
 			fprintf(stderr, INT64_FORMAT " of " INT64_FORMAT " tuples (%d%%) done (elapsed %.2f s, remaining %.2f s)%c",
 					j, (int64) naccounts * scale,
@@ -3927,11 +3916,8 @@ initGenerateDataClientSide(PGconn *con)
 		/* let's not call the timing for each row, but only each 100 rows */
 		else if (use_quiet && (j % 100 == 0))
 		{
-			INSTR_TIME_SET_CURRENT(diff);
-			INSTR_TIME_SUBTRACT(diff, start);
-
-			elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
-			remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
+			double elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
+			double remaining_sec = ((double) scale * naccounts - j) * elapsed_sec / j;
 
 			/* have we reached the next interval (or end)? */
 			if ((j == scale * naccounts) || (elapsed_sec >= log_interval * LOG_STEP_SECONDS))
@@ -4136,10 +4122,8 @@ runInitSteps(const char *initialize_steps)
 
 	for (step = initialize_steps; *step != '\0'; step++)
 	{
-		instr_time	start;
 		char	   *op = NULL;
-
-		INSTR_TIME_SET_CURRENT(start);
+		pg_time_usec_t	start = pg_time_now();
 
 		switch (*step)
 		{
@@ -4181,12 +4165,7 @@ runInitSteps(const char *initialize_steps)
 
 		if (op != NULL)
 		{
-			instr_time	diff;
-			double		elapsed_sec;
-
-			INSTR_TIME_SET_CURRENT(diff);
-			INSTR_TIME_SUBTRACT(diff, start);
-			elapsed_sec = INSTR_TIME_GET_DOUBLE(diff);
+			double		elapsed_sec = PG_TIME_GET_DOUBLE(pg_time_now() - start);
 
 			if (!first)
 				appendPQExpBufferStr(&stats, ", ");
@@ -5108,12 +5087,12 @@ addScript(ParsedScript script)
  * progress report.  On exit, they are updated with the new stats.
  */
 static void
-printProgressReport(TState *threads, int64 test_start, int64 now,
+printProgressReport(TState *threads, int64 test_start, pg_time_usec_t now,
 					StatsData *last, int64 *last_report)
 {
 	/* generate and show report */
-	int64		run = now - *last_report,
-				ntx;
+	pg_time_usec_t	run = now - *last_report;
+	int64		ntx;
 	double		tps,
 				total_run,
 				latency,
@@ -5160,16 +5139,7 @@ printProgressReport(TState *threads, int64 test_start, int64 now,
 
 	if (progress_timestamp)
 	{
-		/*
-		 * On some platforms the current system timestamp is available in
-		 * now_time, but rather than get entangled with that, we just eat the
-		 * cost of an extra syscall in all cases.
-		 */
-		struct timeval tv;
-
-		gettimeofday(&tv, NULL);
-		snprintf(tbuf, sizeof(tbuf), "%ld.%03ld s",
-				 (long) tv.tv_sec, (long) (tv.tv_usec / 1000));
+		snprintf(tbuf, sizeof(tbuf), "%.3f s", PG_TIME_GET_DOUBLE(now));
 	}
 	else
 	{
@@ -5209,21 +5179,18 @@ printSimpleStats(const char *prefix, SimpleStats *ss)
 
 /* print out results */
 static void
-printResults(StatsData *total, instr_time total_time,
-			 instr_time conn_total_time, int64 latency_late)
+printResults(StatsData *total,
+			 pg_time_usec_t total_duration,			/* benchmarking time */
+			 pg_time_usec_t conn_total_duration,	/* is_connect */
+			 pg_time_usec_t conn_elapsed_duration,	/* !is_connect */
+			 int64 latency_late)
 {
-	double		time_include,
-				tps_include,
-				tps_exclude;
+	/* tps is about actually executed transactions during benchmarking */
 	int64		ntx = total->cnt - total->skipped;
+	double 		bench_duration = PG_TIME_GET_DOUBLE(total_duration);
+	double		tps = ntx / bench_duration;
 
-	time_include = INSTR_TIME_GET_DOUBLE(total_time);
-
-	/* tps is about actually executed transactions */
-	tps_include = ntx / time_include;
-	tps_exclude = ntx /
-		(time_include - (INSTR_TIME_GET_DOUBLE(conn_total_time) / nclients));
-
+	printf("pgbench (PostgreSQL) %d.%d\n", PG_VERSION_NUM / 10000, PG_VERSION_NUM % 100);
 	/* Report test parameters. */
 	printf("transaction type: %s\n",
 		   num_scripts == 1 ? sql_script[0].desc : "multiple scripts");
@@ -5254,8 +5221,7 @@ printResults(StatsData *total, instr_time total_time,
 
 	if (throttle_delay && latency_limit)
 		printf("number of transactions skipped: " INT64_FORMAT " (%.3f %%)\n",
-			   total->skipped,
-			   100.0 * total->skipped / total->cnt);
+			   total->skipped, 100.0 * total->skipped / total->cnt);
 
 	if (latency_limit)
 		printf("number of transactions above the %.1f ms latency limit: " INT64_FORMAT "/" INT64_FORMAT " (%.3f %%)\n",
@@ -5268,7 +5234,7 @@ printResults(StatsData *total, instr_time total_time,
 	{
 		/* no measurement, show average latency computed from run time */
 		printf("latency average = %.3f ms\n",
-			   1000.0 * time_include * nclients / total->cnt);
+			   0.001 * total_duration * nclients / total->cnt);
 	}
 
 	if (throttle_delay)
@@ -5283,8 +5249,25 @@ printResults(StatsData *total, instr_time total_time,
 			   0.001 * total->lag.sum / total->cnt, 0.001 * total->lag.max);
 	}
 
-	printf("tps = %f (including connections establishing)\n", tps_include);
-	printf("tps = %f (excluding connections establishing)\n", tps_exclude);
+	/*
+	 * Under -C/--connect, each transaction incurs a significant connection cost,
+	 * it would not make much sense to ignore it in tps, and it would not be tps
+	 * anyway.
+	 *
+	 * Otherwise connections are made just once at the beginning of the run
+	 * and should not impact performance but for very short run, so they are
+	 * (right)fully ignored in tps.
+	 */
+	if (is_connect)
+	{
+		printf("average connection time = %.3f ms\n", 0.001 * conn_total_duration / total->cnt);
+		printf("tps = %f (including reconnection times)\n", tps);
+	}
+	else
+	{
+		printf("initial connection time = %.3f ms\n", 0.001 * conn_elapsed_duration);
+		printf("tps = %f (without initial connection establishing)\n", tps);
+	}
 
 	/* Report per-script/command statistics */
 	if (per_script_stats || report_per_command)
@@ -5305,7 +5288,7 @@ printResults(StatsData *total, instr_time total_time,
 					   100.0 * sql_script[i].weight / total_weight,
 					   sstats->cnt,
 					   100.0 * sstats->cnt / total->cnt,
-					   (sstats->cnt - sstats->skipped) / time_include);
+					   (sstats->cnt - sstats->skipped) / bench_duration);
 
 				if (throttle_delay && latency_limit && sstats->cnt > 0)
 					printf(" - number of transactions skipped: " INT64_FORMAT " (%.3f%%)\n",
@@ -5353,10 +5336,7 @@ set_random_seed(const char *seed)
 	if (seed == NULL || strcmp(seed, "time") == 0)
 	{
 		/* rely on current time */
-		instr_time	now;
-
-		INSTR_TIME_SET_CURRENT(now);
-		iseed = (uint64) INSTR_TIME_GET_MICROSEC(now);
+		iseed = pg_time_now();
 	}
 	else if (strcmp(seed, "rand") == 0)
 	{
@@ -5459,9 +5439,10 @@ main(int argc, char **argv)
 	CState	   *state;			/* status of clients */
 	TState	   *threads;		/* array of thread */
 
-	instr_time	start_time;		/* start up time */
-	instr_time	total_time;
-	instr_time	conn_total_time;
+	pg_time_usec_t
+		start_time,				/* start up time */
+		bench_start = 0,		/* first recorded benchmarking time */
+		conn_total_duration;	/* cumulated connection time in threads */
 	int64		latency_late = 0;
 	StatsData	stats;
 	int			weight;
@@ -6137,62 +6118,51 @@ main(int argc, char **argv)
 	/* all clients must be assigned to a thread */
 	Assert(nclients_dealt == nclients);
 
-	/* get start up time */
-	INSTR_TIME_SET_CURRENT(start_time);
+	/* get start up time for the whole computation */
+	start_time = pg_time_now();
 
 	/* set alarm if duration is specified. */
 	if (duration > 0)
 		setalarm(duration);
 
-	/* start threads */
 #ifdef ENABLE_THREAD_SAFETY
-	for (i = 0; i < nthreads; i++)
+	/* start all threads but thread 0 which is executed directly later */
+	for (i = 1; i < nthreads; i++)
 	{
 		TState	   *thread = &threads[i];
 
-		INSTR_TIME_SET_CURRENT(thread->start_time);
-
-		/* compute when to stop */
-		if (duration > 0)
-			end_time = INSTR_TIME_GET_MICROSEC(thread->start_time) +
-				(int64) 1000000 * duration;
+		thread->create_time = pg_time_now();
+		errno = THREAD_CREATE(&thread->thread, threadRun, thread);
 
-		/* the first thread (i = 0) is executed by main thread */
-		if (i > 0)
+		if (errno != 0)
 		{
-			errno = THREAD_CREATE(&thread->thread, threadRun, thread);
-
-			if (errno != 0)
-			{
-				pg_log_fatal("could not create thread: %m");
-				exit(1);
-			}
+			pg_log_fatal("could not create thread: %m");
+			exit(1);
 		}
 	}
 #else
-	INSTR_TIME_SET_CURRENT(threads[0].start_time);
+	Assert(nthreads == 1);
+#endif							/* ENABLE_THREAD_SAFETY */
+
 	/* compute when to stop */
+	threads[0].create_time = pg_time_now();
 	if (duration > 0)
-		end_time = INSTR_TIME_GET_MICROSEC(threads[0].start_time) +
-			(int64) 1000000 * duration;
-#endif							/* ENABLE_THREAD_SAFETY */
+		end_time = threads[0].create_time + (int64) 1000000 * duration;
 
-	/* wait for threads and accumulate results */
+	/* run thread 0 directly */
+	(void) threadRun(&threads[0]);
+
+	/* wait for other threads and accumulate results */
 	initStats(&stats, 0);
-	INSTR_TIME_SET_ZERO(conn_total_time);
+	conn_total_duration = 0;
+
 	for (i = 0; i < nthreads; i++)
 	{
 		TState	   *thread = &threads[i];
 
 #ifdef ENABLE_THREAD_SAFETY
-		if (i == 0)
-			/* actually run this thread directly in the main thread */
-			(void) threadRun(thread);
-		else
-			/* wait of other threads. should check that 0 is returned? */
+		if (i > 0)
 			THREAD_JOIN(thread->thread);
-#else
-		(void) threadRun(thread);
 #endif							/* ENABLE_THREAD_SAFETY */
 
 		for (int j = 0; j < thread->nstate; j++)
@@ -6205,23 +6175,24 @@ main(int argc, char **argv)
 		stats.cnt += thread->stats.cnt;
 		stats.skipped += thread->stats.skipped;
 		latency_late += thread->latency_late;
-		INSTR_TIME_ADD(conn_total_time, thread->conn_time);
+		conn_total_duration += thread->conn_duration;
+
+		/* first recorded benchmarking start time */
+		if (bench_start == 0 || thread->bench_start < bench_start)
+			bench_start = thread->bench_start;
 	}
+
+	/* XXX should this be connection time? */
 	disconnect_all(state, nclients);
 
 	/*
-	 * XXX We compute results as though every client of every thread started
-	 * and finished at the same time.  That model can diverge noticeably from
-	 * reality for a short benchmark run involving relatively many threads.
-	 * The first thread may process notably many transactions before the last
-	 * thread begins.  Improving the model alone would bring limited benefit,
-	 * because performance during those periods of partial thread count can
-	 * easily exceed steady state performance.  This is one of the many ways
-	 * short runs convey deceptive performance figures.
+	 * Beware that performance of short benchmarks with many threads and possibly
+	 * long transactions can be deceptive because threads do not start and finish
+	 * at the exact same time. The total duration computed here encompasses all
+	 * transactions so that tps shown is somehow slightly underestimated.
 	 */
-	INSTR_TIME_SET_CURRENT(total_time);
-	INSTR_TIME_SUBTRACT(total_time, start_time);
-	printResults(&stats, total_time, conn_total_time, latency_late);
+	printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
+				 bench_start - start_time, latency_late);
 
 	if (exit_code != 0)
 		pg_log_fatal("Run was aborted; the above results are incomplete.");
@@ -6234,34 +6205,16 @@ threadRun(void *arg)
 {
 	TState	   *thread = (TState *) arg;
 	CState	   *state = thread->state;
-	instr_time	start,
-				end;
+	pg_time_usec_t	start;
 	int			nstate = thread->nstate;
 	int			remains = nstate;	/* number of remaining clients */
 	socket_set *sockets = alloc_socket_set(nstate);
-	int			i;
-
-	/* for reporting progress: */
-	int64		thread_start = INSTR_TIME_GET_MICROSEC(thread->start_time);
-	int64		last_report = thread_start;
-	int64		next_report = last_report + (int64) progress * 1000000;
+	int64		thread_start,
+				last_report,
+				next_report;
 	StatsData	last,
 				aggs;
 
-	/*
-	 * Initialize throttling rate target for all of the thread's clients.  It
-	 * might be a little more accurate to reset thread->start_time here too.
-	 * The possible drift seems too small relative to typical throttle delay
-	 * times to worry about it.
-	 */
-	INSTR_TIME_SET_CURRENT(start);
-	thread->throttle_trigger = INSTR_TIME_GET_MICROSEC(start);
-
-	INSTR_TIME_SET_ZERO(thread->conn_time);
-
-	initStats(&aggs, time(NULL));
-	last = aggs;
-
 	/* open log file if requested */
 	if (use_log)
 	{
@@ -6282,32 +6235,49 @@ threadRun(void *arg)
 		}
 	}
 
+	/* explicitly initialize the state machines */
+	for (int i = 0; i < nstate; i++)
+		state[i].state = CSTATE_CHOOSE_SCRIPT;
+
+	/* READY */
+	thread_start = pg_time_now();
+	thread->started_time = thread_start;
+	last_report = thread_start;
+	next_report = last_report + (int64) 1000000 * progress;
+
+	/* STEADY */
 	if (!is_connect)
 	{
 		/* make connections to the database before starting */
-		for (i = 0; i < nstate; i++)
+		for (int i = 0; i < nstate; i++)
 		{
 			if ((state[i].con = doConnect()) == NULL)
 				goto done;
 		}
-	}
 
-	/* time after thread and connections set up */
-	INSTR_TIME_SET_CURRENT(thread->conn_time);
-	INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
-
-	/* explicitly initialize the state machines */
-	for (i = 0; i < nstate; i++)
+		/* compute connection delay */
+		thread->conn_duration = pg_time_now() - thread->started_time;
+	}
+	else
 	{
-		state[i].state = CSTATE_CHOOSE_SCRIPT;
+		/* no connection delay to record */
+		thread->conn_duration = 0;
 	}
 
+
+	start = pg_time_now();
+	thread->bench_start = start;
+	thread->throttle_trigger = start;
+
+	initStats(&aggs, start);
+	last = aggs;
+
 	/* loop till all clients have terminated */
 	while (remains > 0)
 	{
 		int			nsocks;		/* number of sockets to be waited for */
-		int64		min_usec;
-		int64		now_usec = 0;	/* set this only if needed */
+		pg_time_usec_t	min_usec;
+		pg_time_usec_t	now = 0;	/* set this only if needed */
 
 		/*
 		 * identify which client sockets should be checked for input, and
@@ -6316,27 +6286,21 @@ threadRun(void *arg)
 		clear_socket_set(sockets);
 		nsocks = 0;
 		min_usec = PG_INT64_MAX;
-		for (i = 0; i < nstate; i++)
+		for (int i = 0; i < nstate; i++)
 		{
 			CState	   *st = &state[i];
 
 			if (st->state == CSTATE_SLEEP || st->state == CSTATE_THROTTLE)
 			{
 				/* a nap from the script, or under throttling */
-				int64		this_usec;
+				pg_time_usec_t		this_usec;
 
 				/* get current time if needed */
-				if (now_usec == 0)
-				{
-					instr_time	now;
-
-					INSTR_TIME_SET_CURRENT(now);
-					now_usec = INSTR_TIME_GET_MICROSEC(now);
-				}
+				pg_time_now_lazy(&now);
 
 				/* min_usec should be the minimum delay across all clients */
 				this_usec = (st->state == CSTATE_SLEEP ?
-							 st->sleep_until : st->txn_scheduled) - now_usec;
+							 st->sleep_until : st->txn_scheduled) - now;
 				if (min_usec > this_usec)
 					min_usec = this_usec;
 			}
@@ -6371,19 +6335,12 @@ threadRun(void *arg)
 		/* also wake up to print the next progress report on time */
 		if (progress && min_usec > 0 && thread->tid == 0)
 		{
-			/* get current time if needed */
-			if (now_usec == 0)
-			{
-				instr_time	now;
+			pg_time_now_lazy(&now);
 
-				INSTR_TIME_SET_CURRENT(now);
-				now_usec = INSTR_TIME_GET_MICROSEC(now);
-			}
-
-			if (now_usec >= next_report)
+			if (now >= next_report)
 				min_usec = 0;
-			else if ((next_report - now_usec) < min_usec)
-				min_usec = next_report - now_usec;
+			else if ((next_report - now) < min_usec)
+				min_usec = next_report - now;
 		}
 
 		/*
@@ -6432,7 +6389,7 @@ threadRun(void *arg)
 
 		/* ok, advance the state machine of each connection */
 		nsocks = 0;
-		for (i = 0; i < nstate; i++)
+		for (int i = 0; i < nstate; i++)
 		{
 			CState	   *st = &state[i];
 
@@ -6470,11 +6427,8 @@ threadRun(void *arg)
 		/* progress report is made by thread 0 for all threads */
 		if (progress && thread->tid == 0)
 		{
-			instr_time	now_time;
-			int64		now;
+			pg_time_usec_t	now = pg_time_now();
 
-			INSTR_TIME_SET_CURRENT(now_time);
-			now = INSTR_TIME_GET_MICROSEC(now_time);
 			if (now >= next_report)
 			{
 				/*
@@ -6492,17 +6446,17 @@ threadRun(void *arg)
 				 */
 				do
 				{
-					next_report += (int64) progress * 1000000;
+					next_report += (int64) 1000000 * progress;
 				} while (now >= next_report);
 			}
 		}
 	}
 
 done:
-	INSTR_TIME_SET_CURRENT(start);
+	start = pg_time_now();
 	disconnect_all(state, nstate);
-	INSTR_TIME_SET_CURRENT(end);
-	INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
+	thread->conn_duration += pg_time_now() - start;
+
 	if (thread->logfile)
 	{
 		if (agg_interval > 0)
diff --git a/src/include/portability/instr_time.h b/src/include/portability/instr_time.h
index 39a4f0600e..faf806a441 100644
--- a/src/include/portability/instr_time.h
+++ b/src/include/portability/instr_time.h
@@ -253,4 +253,32 @@ GetTimerFrequency(void)
 #define INSTR_TIME_SET_CURRENT_LAZY(t) \
 	(INSTR_TIME_IS_ZERO(t) ? INSTR_TIME_SET_CURRENT(t), true : false)
 
+/*
+ * Simpler convenient interface
+ *
+ * The instr_time type is expensive when dealing with time arithmetic.
+ * Define a type to hold microseconds on top of this, suitable for
+ * benchmarking performance measures, eg in "pgbench".
+ *
+ * Type int64 is good enough for about 584500 years.
+ */
+typedef int64 pg_time_usec_t;
+
+static inline pg_time_usec_t
+pg_time_now(void)
+{
+	instr_time now;
+
+	INSTR_TIME_SET_CURRENT(now);
+	return (pg_time_usec_t) INSTR_TIME_GET_MICROSEC(now);
+}
+
+static inline void
+pg_time_now_lazy(pg_time_usec_t *now)
+{
+	if ((*now) == 0)
+		(*now) = pg_time_now();
+}
+
+#define PG_TIME_GET_DOUBLE(t) (0.000001 * (t))
 #endif							/* INSTR_TIME_H */
-- 
2.30.0

From 8d3578ce82f25f0b6a1475cf7149d77e16515660 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.mu...@gmail.com>
Date: Mon, 18 Jan 2021 10:07:31 +1300
Subject: [PATCH v11 4/6] pgbench: Synchronize client threads.

Wait until all pgbench threads are connected before benchmarking begins.

Author: Andres Freund <and...@anarazel.de>
Author: Fabien COELHO <coe...@cri.ensmp.fr>
Reviewed-by: Marina Polyakova <m.polyak...@postgrespro.ru>
Reviewed-by: Kyotaro Horiguchi <horikyota....@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hay...@fujitsu.com>
Discussion: https://postgr.es/m/20200227180100.zyvjwzcpiokfsqm2%40alap3.anarazel.de
---
 src/bin/pgbench/pgbench.c | 45 ++++++++++++++++++++++++++++++++++++---
 1 file changed, 42 insertions(+), 3 deletions(-)

diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 40f8ae02a1..a71182805f 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -125,9 +125,17 @@ typedef struct socket_set
 #define THREAD_JOIN(handle) \
 	(WaitForSingleObject(handle, INFINITE) != WAIT_OBJECT_0 ? \
 	GETERRNO() : CloseHandle(handle) ? 0 : GETERRNO())
+#define THREAD_BARRIER_T SYNCHRONIZATION_BARRIER
+#define THREAD_BARRIER_INIT(barrier, n) \
+	(InitializeSynchronizationBarrier((barrier), (n), 0) ? 0 : GETERRNO())
+#define THREAD_BARRIER_WAIT(barrier) \
+	(EnterSynchronizationBarrier((barrier), \
+								 SYNCHRONIZATION_BARRIER_FLAGS_BLOCK_ONLY) ? \
+								 0 : GETERRNO())
+#define THREAD_BARRIER_DESTROY(barrier)
 #elif defined(ENABLE_THREAD_SAFETY)
 /* Use platform-dependent pthread capability */
-#include <pthread.h>
+#include "port/pg_pthread.h"
 #define THREAD_T pthread_t
 #define THREAD_FUNC_RETURN_TYPE void *
 #define THREAD_FUNC_RETURN return NULL
@@ -135,11 +143,20 @@ typedef struct socket_set
 	pthread_create((handle), NULL, (function), (arg))
 #define THREAD_JOIN(handle) \
 	pthread_join((handle), NULL)
+#define THREAD_BARRIER_T pthread_barrier_t
+#define THREAD_BARRIER_INIT(barrier, n) \
+	pthread_barrier_init((barrier), NULL, (n))
+#define THREAD_BARRIER_WAIT(barrier) pthread_barrier_wait((barrier))
+#define THREAD_BARRIER_DESTROY(barrier) pthread_barrier_destroy((barrier))
 #else
 /* No threads implementation, use none (-j 1) */
 #define THREAD_T void *
 #define THREAD_FUNC_RETURN_TYPE void *
 #define THREAD_FUNC_RETURN return NULL
+#define THREAD_BARRIER_T void *
+#define THREAD_BARRIER_INIT(barrier, n)
+#define THREAD_BARRIER_WAIT(barrier)
+#define THREAD_BARRIER_DESTROY(barrier)
 #endif
 
 
@@ -325,6 +342,9 @@ typedef struct RandomState
 /* Various random sequences are initialized from this one. */
 static RandomState base_random_sequence;
 
+/* Synchronization barrier for start and connection */
+static THREAD_BARRIER_T barrier;
+
 /*
  * Connection state machine states.
  */
@@ -467,8 +487,8 @@ typedef struct
 
 	/* per thread collected stats in microseconds */
 	pg_time_usec_t	create_time;	/* thread creation time */
-	pg_time_usec_t	started_time;	/* thread is running */
-	pg_time_usec_t	bench_start; 	/* thread is benchmarking */
+	pg_time_usec_t	started_time;	/* thread is running after start barrier */
+	pg_time_usec_t	bench_start; 	/* thread is benchmarking after connection barrier */
 	pg_time_usec_t	conn_duration;	/* cumulated connection and deconnection delays */
 
 	StatsData	stats;
@@ -6125,6 +6145,8 @@ main(int argc, char **argv)
 	if (duration > 0)
 		setalarm(duration);
 
+	THREAD_BARRIER_INIT(&barrier, nthreads);
+
 #ifdef ENABLE_THREAD_SAFETY
 	/* start all threads but thread 0 which is executed directly later */
 	for (i = 1; i < nthreads; i++)
@@ -6194,6 +6216,8 @@ main(int argc, char **argv)
 	printResults(&stats, pg_time_now() - bench_start, conn_total_duration,
 				 bench_start - start_time, latency_late);
 
+	THREAD_BARRIER_DESTROY(&barrier);
+
 	if (exit_code != 0)
 		pg_log_fatal("Run was aborted; the above results are incomplete.");
 
@@ -6240,6 +6264,8 @@ threadRun(void *arg)
 		state[i].state = CSTATE_CHOOSE_SCRIPT;
 
 	/* READY */
+	THREAD_BARRIER_WAIT(&barrier);
+
 	thread_start = pg_time_now();
 	thread->started_time = thread_start;
 	last_report = thread_start;
@@ -6252,7 +6278,18 @@ threadRun(void *arg)
 		for (int i = 0; i < nstate; i++)
 		{
 			if ((state[i].con = doConnect()) == NULL)
+			{
+				/*
+				 * On connection failure, we meet the barrier here in place of
+				 * GO before proceeding to the "done" path which will cleanup,
+				 * so as to avoid locking the process.
+				 *
+				 * It is unclear whether it is worth doing anything rather than
+				 * coldly exiting with an error message.
+				 */
+				THREAD_BARRIER_WAIT(&barrier);
 				goto done;
+			}
 		}
 
 		/* compute connection delay */
@@ -6264,6 +6301,8 @@ threadRun(void *arg)
 		thread->conn_duration = 0;
 	}
 
+	/* GO */
+	THREAD_BARRIER_WAIT(&barrier);
 
 	start = pg_time_now();
 	thread->bench_start = start;
-- 
2.30.0

Reply via email to