This patch removes the pgbench thread fork-emulation code and simplifies things where possible, especially around pthread_create and pthread_join. The stats collection for the report is done directly instead of using an intermediate structure.

As a result, if no thread implementation is available, pgbench is restricted to work with only the main thread (ie "pgbench -j 1 ...").


== Rational ==

Pgbench currently provides a thread emulation through process forks. This feature was developed way back when it may have been common that some platforms were not supporting threads. This is now very rare (can you name one such platform?).

However, the thread fork-emulation feature has drawbacks: Namely, processes are not threads, the memory is not shared (sure), so it hinders simple implementation for some features, or results in not providing these features with fork-emulation, or having a different behavior under fork-emulation:

Latency collection (-r) is not supported with fork emulation.

Progress (-P) is reported differently with fork emulation.

For a new feature under discussion, which consist in allowing one log instead of per-thread logs, supporting fork-emulation requires a (heavy) post-processing external sort phase whereas with actual threads all threads can share and append to the same log file with limited overhead, which is significantly simpler.

== Note ==

This is a small regression (for platforms without thread support, -j J with J > 1 is not supported anymore after the patch), so maybe this should be included for PostgreSQL 10.0 only? I do not think this should required, but this is only my opinion.

--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 06a4dfd..989f151 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -70,20 +70,8 @@ static int	pthread_join(pthread_t th, void **thread_return);
 /* Use platform-dependent pthread capability */
 #include <pthread.h>
 #else
-/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
-#define PTHREAD_FORK_EMULATION
-#include <sys/wait.h>
-
-#define pthread_t				pg_pthread_t
-#define pthread_attr_t			pg_pthread_attr_t
-#define pthread_create			pg_pthread_create
-#define pthread_join			pg_pthread_join
-
-typedef struct fork_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int	pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
-static int	pthread_join(pthread_t th, void **thread_return);
+/* No threads implementation, use none (-j 1) */
+#define pthread_t void *
 #endif
 
 
@@ -210,8 +198,6 @@ typedef struct
 	PGconn	   *con;			/* connection handle to DB */
 	int			id;				/* client No. */
 	int			state;			/* state No. */
-	int			cnt;			/* xacts count */
-	int			ecnt;			/* error count */
 	int			listen;			/* 0 indicates that an async query has been
 								 * sent */
 	int			sleeping;		/* 1 indicates that the client is napping */
@@ -221,15 +207,19 @@ typedef struct
 	int64		txn_scheduled;	/* scheduled start time of transaction (usec) */
 	instr_time	txn_begin;		/* used for measuring schedule lag times */
 	instr_time	stmt_begin;		/* used for measuring statement latencies */
-	int64		txn_latencies;	/* cumulated latencies */
-	int64		txn_sqlats;		/* cumulated square latencies */
 	bool		is_throttled;	/* whether transaction throttling is done */
 	int			use_file;		/* index in sql_files for this client */
 	bool		prepared[MAX_FILES];
+
+	/* per client collected stats */
+	int			cnt;			/* xacts count */
+	int			ecnt;			/* error count */
+	int64		txn_latencies;	/* cumulated latencies */
+	int64		txn_sqlats;		/* cumulated square latencies */
 } CState;
 
 /*
- * Thread state and result
+ * Thread state
  */
 typedef struct
 {
@@ -242,6 +232,9 @@ typedef struct
 	int		   *exec_count;		/* number of cmd executions (per Command) */
 	unsigned short random_state[3];		/* separate randomness for each thread */
 	int64		throttle_trigger;		/* previous/next throttling (us) */
+
+	/* per thread collected stats */
+	instr_time	conn_time;
 	int64		throttle_lag;	/* total transaction lag behind throttling */
 	int64		throttle_lag_max;		/* max transaction lag */
 	int64		throttle_latency_skipped; /* lagging transactions skipped */
@@ -250,18 +243,6 @@ typedef struct
 
 #define INVALID_THREAD		((pthread_t) 0)
 
-typedef struct
-{
-	instr_time	conn_time;
-	int64		xacts;
-	int64		latencies;
-	int64		sqlats;
-	int64		throttle_lag;
-	int64		throttle_lag_max;
-	int64		throttle_latency_skipped;
-	int64		latency_late;
-} TResult;
-
 /*
  * queries read from files
  */
@@ -2895,6 +2876,13 @@ main(int argc, char **argv)
 					fprintf(stderr, "invalid number of threads: %d\n", nthreads);
 					exit(1);
 				}
+#if !defined(ENABLE_THREAD_SAFETY)
+				if (nthreads != 1)
+				{
+					fprintf(stderr, "no threads available, use only \"-j 1\"\n");
+					exit(1);
+				}
+#endif /* !ENABLE_THREAD_SAFETY */
 				break;
 			case 'C':
 				benchmarking_option_set = true;
@@ -3161,22 +3149,6 @@ main(int argc, char **argv)
 	}
 
 	/*
-	 * is_latencies only works with multiple threads in thread-based
-	 * implementations, not fork-based ones, because it supposes that the
-	 * parent can see changes made to the per-thread execution stats by child
-	 * threads.  It seems useful enough to accept despite this limitation, but
-	 * perhaps we should FIXME someday (by passing the stats data back up
-	 * through the parent-to-child pipes).
-	 */
-#ifndef ENABLE_THREAD_SAFETY
-	if (is_latencies && nthreads > 1)
-	{
-		fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
-		exit(1);
-	}
-#endif
-
-	/*
 	 * save main process id in the global variable because process id will be
 	 * changed after fork.
 	 */
@@ -3372,6 +3344,7 @@ main(int argc, char **argv)
 		setalarm(duration);
 
 	/* start threads */
+#if defined(ENABLE_THREAD_SAFETY)
 	for (i = 0; i < nthreads; i++)
 	{
 		TState	   *thread = &threads[i];
@@ -3394,32 +3367,43 @@ main(int argc, char **argv)
 			thread->thread = INVALID_THREAD;
 		}
 	}
+#else
+	INSTR_TIME_SET_CURRENT(threads[0].start_time);
+	threads[0].thread = INVALID_THREAD;
+#endif /* ENABLE_THREAD_SAFETY */
 
 	/* wait for threads and accumulate results */
 	INSTR_TIME_SET_ZERO(conn_total_time);
 	for (i = 0; i < nthreads; i++)
 	{
-		void	   *ret = NULL;
+		TState	   *thread = &threads[i];
+		int j;
 
+#if defined(ENABLE_THREAD_SAFETY)
 		if (threads[i].thread == INVALID_THREAD)
-			ret = threadRun(&threads[i]);
+			/* actually run this thread directly in the main thread */
+			(void) threadRun(thread);
 		else
-			pthread_join(threads[i].thread, &ret);
+			/* wait of other threads. should check that 0 is returned? */
+			pthread_join(thread->thread, NULL);
+#else
+		(void) threadRun(thread);
+#endif /* ENABLE_THREAD_SAFETY */
 
-		if (ret != NULL)
-		{
-			TResult    *r = (TResult *) ret;
+		/* thread level stats */
+		throttle_lag += thread->throttle_lag;
+		throttle_latency_skipped = threads->throttle_latency_skipped;
+		latency_late = thread->latency_late;
+		if (throttle_lag_max > thread->throttle_lag_max)
+			throttle_lag_max = thread->throttle_lag_max;
+		INSTR_TIME_ADD(conn_total_time, thread->conn_time);
 
-			total_xacts += r->xacts;
-			total_latencies += r->latencies;
-			total_sqlats += r->sqlats;
-			throttle_lag += r->throttle_lag;
-			throttle_latency_skipped += r->throttle_latency_skipped;
-			latency_late += r->latency_late;
-			if (r->throttle_lag_max > throttle_lag_max)
-				throttle_lag_max = r->throttle_lag_max;
-			INSTR_TIME_ADD(conn_total_time, r->conn_time);
-			free(ret);
+		/* client-level stats */
+		for (j = 0; j < thread->nstate; j++)
+		{
+			total_xacts += thread->state[j].cnt;
+			total_latencies += thread->state[i].txn_latencies;
+			total_sqlats += thread->state[i].txn_sqlats;
 		}
 	}
 	disconnect_all(state, nclients);
@@ -3449,7 +3433,6 @@ threadRun(void *arg)
 {
 	TState	   *thread = (TState *) arg;
 	CState	   *state = thread->state;
-	TResult    *result;
 	FILE	   *logfile = NULL; /* per-thread log file */
 	instr_time	start,
 				end;
@@ -3480,9 +3463,7 @@ threadRun(void *arg)
 	thread->throttle_lag = 0;
 	thread->throttle_lag_max = 0;
 
-	result = pg_malloc(sizeof(TResult));
-
-	INSTR_TIME_SET_ZERO(result->conn_time);
+	INSTR_TIME_SET_ZERO(thread->conn_time);
 
 	/* open log file if requested */
 	if (use_log)
@@ -3513,8 +3494,8 @@ threadRun(void *arg)
 	}
 
 	/* time after thread and connections set up */
-	INSTR_TIME_SET_CURRENT(result->conn_time);
-	INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+	INSTR_TIME_SET_CURRENT(thread->conn_time);
+	INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
 
 	agg_vals_init(&aggs, thread->start_time);
 
@@ -3526,7 +3507,7 @@ threadRun(void *arg)
 		int			prev_ecnt = st->ecnt;
 
 		st->use_file = getrand(thread, 0, num_files - 1);
-		if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+		if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
 			remains--;			/* I've aborted */
 
 		if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -3641,7 +3622,7 @@ threadRun(void *arg)
 			if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
 							|| commands[st->state]->type == META_COMMAND))
 			{
-				if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+				if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
 					remains--;	/* I've aborted */
 			}
 
@@ -3654,68 +3635,6 @@ threadRun(void *arg)
 			}
 		}
 
-#ifdef PTHREAD_FORK_EMULATION
-		/* each process reports its own progression */
-		if (progress)
-		{
-			instr_time	now_time;
-			int64		now;
-
-			INSTR_TIME_SET_CURRENT(now_time);
-			now = INSTR_TIME_GET_MICROSEC(now_time);
-			if (now >= next_report)
-			{
-				/* generate and show report */
-				int64		count = 0,
-							lats = 0,
-							sqlats = 0,
-							skipped = 0;
-				int64		lags = thread->throttle_lag;
-				int64		run = now - last_report;
-				double		tps,
-							total_run,
-							latency,
-							sqlat,
-							stdev,
-							lag;
-
-				for (i = 0; i < nstate; i++)
-				{
-					count += state[i].cnt;
-					lats += state[i].txn_latencies;
-					sqlats += state[i].txn_sqlats;
-				}
-
-				total_run = (now - thread_start) / 1000000.0;
-				tps = 1000000.0 * (count - last_count) / run;
-				latency = 0.001 * (lats - last_lats) / (count - last_count);
-				sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
-				stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
-				lag = 0.001 * (lags - last_lags) / (count - last_count);
-				skipped = thread->throttle_latency_skipped - last_skipped;
-
-				fprintf(stderr,
-						"progress %d: %.1f s, %.1f tps, "
-						"lat %.3f ms stddev %.3f",
-						thread->tid, total_run, tps, latency, stdev);
-				if (throttle_delay)
-				{
-					fprintf(stderr, ", lag %.3f ms", lag);
-					if (latency_limit)
-						fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
-				}
-				fprintf(stderr, "\n");
-
-				last_count = count;
-				last_lats = lats;
-				last_sqlats = sqlats;
-				last_lags = lags;
-				last_report = now;
-				last_skipped = thread->throttle_latency_skipped;
-				next_report += (int64) progress *1000000;
-			}
-		}
-#else
 		/* progress report by thread 0 for all threads */
 		if (progress && thread->tid == 0)
 		{
@@ -3779,31 +3698,16 @@ threadRun(void *arg)
 				next_report += (int64) progress *1000000;
 			}
 		}
-#endif   /* PTHREAD_FORK_EMULATION */
 	}
 
 done:
 	INSTR_TIME_SET_CURRENT(start);
 	disconnect_all(state, nstate);
-	result->xacts = 0;
-	result->latencies = 0;
-	result->sqlats = 0;
-	for (i = 0; i < nstate; i++)
-	{
-		result->xacts += state[i].cnt;
-		result->latencies += state[i].txn_latencies;
-		result->sqlats += state[i].txn_sqlats;
-	}
-	result->throttle_lag = thread->throttle_lag;
-	result->throttle_lag_max = thread->throttle_lag_max;
-	result->throttle_latency_skipped = thread->throttle_latency_skipped;
-	result->latency_late = thread->latency_late;
-
 	INSTR_TIME_SET_CURRENT(end);
-	INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+	INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
 	if (logfile)
 		fclose(logfile);
-	return result;
+	return NULL;
 }
 
 /*
@@ -3825,90 +3729,6 @@ setalarm(int seconds)
 	alarm(seconds);
 }
 
-#ifndef ENABLE_THREAD_SAFETY
-
-/*
- * implements pthread using fork.
- */
-
-typedef struct fork_pthread
-{
-	pid_t		pid;
-	int			pipes[2];
-}	fork_pthread;
-
-static int
-pthread_create(pthread_t *thread,
-			   pthread_attr_t *attr,
-			   void *(*start_routine) (void *),
-			   void *arg)
-{
-	fork_pthread *th;
-	void	   *ret;
-	int			rc;
-
-	th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
-	if (pipe(th->pipes) < 0)
-	{
-		free(th);
-		return errno;
-	}
-
-	th->pid = fork();
-	if (th->pid == -1)			/* error */
-	{
-		free(th);
-		return errno;
-	}
-	if (th->pid != 0)			/* in parent process */
-	{
-		close(th->pipes[1]);
-		*thread = th;
-		return 0;
-	}
-
-	/* in child process */
-	close(th->pipes[0]);
-
-	/* set alarm again because the child does not inherit timers */
-	if (duration > 0)
-		setalarm(duration);
-
-	ret = start_routine(arg);
-	rc = write(th->pipes[1], ret, sizeof(TResult));
-	(void) rc;
-	close(th->pipes[1]);
-	free(th);
-	exit(0);
-}
-
-static int
-pthread_join(pthread_t th, void **thread_return)
-{
-	int			status;
-
-	while (waitpid(th->pid, &status, 0) != th->pid)
-	{
-		if (errno != EINTR)
-			return errno;
-	}
-
-	if (thread_return != NULL)
-	{
-		/* assume result is TResult */
-		*thread_return = pg_malloc(sizeof(TResult));
-		if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
-		{
-			free(*thread_return);
-			*thread_return = NULL;
-		}
-	}
-	close(th->pipes[0]);
-
-	free(th);
-	return 0;
-}
-#endif
 #else							/* WIN32 */
 
 static VOID CALLBACK
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to