This patch removes the pgbench thread fork-emulation code and simplifies
things where possible, especially around pthread_create and pthread_join.
The stats collection for the report is done directly instead of using an
intermediate structure.
As a result, if no thread implementation is available, pgbench is
restricted to work with only the main thread (ie "pgbench -j 1 ...").
== Rational ==
Pgbench currently provides a thread emulation through process forks. This
feature was developed way back when it may have been common that some
platforms were not supporting threads. This is now very rare (can you name
one such platform?).
However, the thread fork-emulation feature has drawbacks: Namely,
processes are not threads, the memory is not shared (sure), so it hinders
simple implementation for some features, or results in not providing these
features with fork-emulation, or having a different behavior under
fork-emulation:
Latency collection (-r) is not supported with fork emulation.
Progress (-P) is reported differently with fork emulation.
For a new feature under discussion, which consist in allowing one log
instead of per-thread logs, supporting fork-emulation requires a (heavy)
post-processing external sort phase whereas with actual threads all
threads can share and append to the same log file with limited overhead,
which is significantly simpler.
== Note ==
This is a small regression (for platforms without thread support, -j J
with J > 1 is not supported anymore after the patch), so maybe this should
be included for PostgreSQL 10.0 only? I do not think this should required,
but this is only my opinion.
--
Fabien.
diff --git a/src/bin/pgbench/pgbench.c b/src/bin/pgbench/pgbench.c
index 06a4dfd..989f151 100644
--- a/src/bin/pgbench/pgbench.c
+++ b/src/bin/pgbench/pgbench.c
@@ -70,20 +70,8 @@ static int pthread_join(pthread_t th, void **thread_return);
/* Use platform-dependent pthread capability */
#include <pthread.h>
#else
-/* Use emulation with fork. Rename pthread identifiers to avoid conflicts */
-#define PTHREAD_FORK_EMULATION
-#include <sys/wait.h>
-
-#define pthread_t pg_pthread_t
-#define pthread_attr_t pg_pthread_attr_t
-#define pthread_create pg_pthread_create
-#define pthread_join pg_pthread_join
-
-typedef struct fork_pthread *pthread_t;
-typedef int pthread_attr_t;
-
-static int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
-static int pthread_join(pthread_t th, void **thread_return);
+/* No threads implementation, use none (-j 1) */
+#define pthread_t void *
#endif
@@ -210,8 +198,6 @@ typedef struct
PGconn *con; /* connection handle to DB */
int id; /* client No. */
int state; /* state No. */
- int cnt; /* xacts count */
- int ecnt; /* error count */
int listen; /* 0 indicates that an async query has been
* sent */
int sleeping; /* 1 indicates that the client is napping */
@@ -221,15 +207,19 @@ typedef struct
int64 txn_scheduled; /* scheduled start time of transaction (usec) */
instr_time txn_begin; /* used for measuring schedule lag times */
instr_time stmt_begin; /* used for measuring statement latencies */
- int64 txn_latencies; /* cumulated latencies */
- int64 txn_sqlats; /* cumulated square latencies */
bool is_throttled; /* whether transaction throttling is done */
int use_file; /* index in sql_files for this client */
bool prepared[MAX_FILES];
+
+ /* per client collected stats */
+ int cnt; /* xacts count */
+ int ecnt; /* error count */
+ int64 txn_latencies; /* cumulated latencies */
+ int64 txn_sqlats; /* cumulated square latencies */
} CState;
/*
- * Thread state and result
+ * Thread state
*/
typedef struct
{
@@ -242,6 +232,9 @@ typedef struct
int *exec_count; /* number of cmd executions (per Command) */
unsigned short random_state[3]; /* separate randomness for each thread */
int64 throttle_trigger; /* previous/next throttling (us) */
+
+ /* per thread collected stats */
+ instr_time conn_time;
int64 throttle_lag; /* total transaction lag behind throttling */
int64 throttle_lag_max; /* max transaction lag */
int64 throttle_latency_skipped; /* lagging transactions skipped */
@@ -250,18 +243,6 @@ typedef struct
#define INVALID_THREAD ((pthread_t) 0)
-typedef struct
-{
- instr_time conn_time;
- int64 xacts;
- int64 latencies;
- int64 sqlats;
- int64 throttle_lag;
- int64 throttle_lag_max;
- int64 throttle_latency_skipped;
- int64 latency_late;
-} TResult;
-
/*
* queries read from files
*/
@@ -2895,6 +2876,13 @@ main(int argc, char **argv)
fprintf(stderr, "invalid number of threads: %d\n", nthreads);
exit(1);
}
+#if !defined(ENABLE_THREAD_SAFETY)
+ if (nthreads != 1)
+ {
+ fprintf(stderr, "no threads available, use only \"-j 1\"\n");
+ exit(1);
+ }
+#endif /* !ENABLE_THREAD_SAFETY */
break;
case 'C':
benchmarking_option_set = true;
@@ -3161,22 +3149,6 @@ main(int argc, char **argv)
}
/*
- * is_latencies only works with multiple threads in thread-based
- * implementations, not fork-based ones, because it supposes that the
- * parent can see changes made to the per-thread execution stats by child
- * threads. It seems useful enough to accept despite this limitation, but
- * perhaps we should FIXME someday (by passing the stats data back up
- * through the parent-to-child pipes).
- */
-#ifndef ENABLE_THREAD_SAFETY
- if (is_latencies && nthreads > 1)
- {
- fprintf(stderr, "-r does not work with -j larger than 1 on this platform.\n");
- exit(1);
- }
-#endif
-
- /*
* save main process id in the global variable because process id will be
* changed after fork.
*/
@@ -3372,6 +3344,7 @@ main(int argc, char **argv)
setalarm(duration);
/* start threads */
+#if defined(ENABLE_THREAD_SAFETY)
for (i = 0; i < nthreads; i++)
{
TState *thread = &threads[i];
@@ -3394,32 +3367,43 @@ main(int argc, char **argv)
thread->thread = INVALID_THREAD;
}
}
+#else
+ INSTR_TIME_SET_CURRENT(threads[0].start_time);
+ threads[0].thread = INVALID_THREAD;
+#endif /* ENABLE_THREAD_SAFETY */
/* wait for threads and accumulate results */
INSTR_TIME_SET_ZERO(conn_total_time);
for (i = 0; i < nthreads; i++)
{
- void *ret = NULL;
+ TState *thread = &threads[i];
+ int j;
+#if defined(ENABLE_THREAD_SAFETY)
if (threads[i].thread == INVALID_THREAD)
- ret = threadRun(&threads[i]);
+ /* actually run this thread directly in the main thread */
+ (void) threadRun(thread);
else
- pthread_join(threads[i].thread, &ret);
+ /* wait of other threads. should check that 0 is returned? */
+ pthread_join(thread->thread, NULL);
+#else
+ (void) threadRun(thread);
+#endif /* ENABLE_THREAD_SAFETY */
- if (ret != NULL)
- {
- TResult *r = (TResult *) ret;
+ /* thread level stats */
+ throttle_lag += thread->throttle_lag;
+ throttle_latency_skipped = threads->throttle_latency_skipped;
+ latency_late = thread->latency_late;
+ if (throttle_lag_max > thread->throttle_lag_max)
+ throttle_lag_max = thread->throttle_lag_max;
+ INSTR_TIME_ADD(conn_total_time, thread->conn_time);
- total_xacts += r->xacts;
- total_latencies += r->latencies;
- total_sqlats += r->sqlats;
- throttle_lag += r->throttle_lag;
- throttle_latency_skipped += r->throttle_latency_skipped;
- latency_late += r->latency_late;
- if (r->throttle_lag_max > throttle_lag_max)
- throttle_lag_max = r->throttle_lag_max;
- INSTR_TIME_ADD(conn_total_time, r->conn_time);
- free(ret);
+ /* client-level stats */
+ for (j = 0; j < thread->nstate; j++)
+ {
+ total_xacts += thread->state[j].cnt;
+ total_latencies += thread->state[i].txn_latencies;
+ total_sqlats += thread->state[i].txn_sqlats;
}
}
disconnect_all(state, nclients);
@@ -3449,7 +3433,6 @@ threadRun(void *arg)
{
TState *thread = (TState *) arg;
CState *state = thread->state;
- TResult *result;
FILE *logfile = NULL; /* per-thread log file */
instr_time start,
end;
@@ -3480,9 +3463,7 @@ threadRun(void *arg)
thread->throttle_lag = 0;
thread->throttle_lag_max = 0;
- result = pg_malloc(sizeof(TResult));
-
- INSTR_TIME_SET_ZERO(result->conn_time);
+ INSTR_TIME_SET_ZERO(thread->conn_time);
/* open log file if requested */
if (use_log)
@@ -3513,8 +3494,8 @@ threadRun(void *arg)
}
/* time after thread and connections set up */
- INSTR_TIME_SET_CURRENT(result->conn_time);
- INSTR_TIME_SUBTRACT(result->conn_time, thread->start_time);
+ INSTR_TIME_SET_CURRENT(thread->conn_time);
+ INSTR_TIME_SUBTRACT(thread->conn_time, thread->start_time);
agg_vals_init(&aggs, thread->start_time);
@@ -3526,7 +3507,7 @@ threadRun(void *arg)
int prev_ecnt = st->ecnt;
st->use_file = getrand(thread, 0, num_files - 1);
- if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+ if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
remains--; /* I've aborted */
if (st->ecnt > prev_ecnt && commands[st->state]->type == META_COMMAND)
@@ -3641,7 +3622,7 @@ threadRun(void *arg)
if (st->con && (FD_ISSET(PQsocket(st->con), &input_mask)
|| commands[st->state]->type == META_COMMAND))
{
- if (!doCustom(thread, st, &result->conn_time, logfile, &aggs))
+ if (!doCustom(thread, st, &thread->conn_time, logfile, &aggs))
remains--; /* I've aborted */
}
@@ -3654,68 +3635,6 @@ threadRun(void *arg)
}
}
-#ifdef PTHREAD_FORK_EMULATION
- /* each process reports its own progression */
- if (progress)
- {
- instr_time now_time;
- int64 now;
-
- INSTR_TIME_SET_CURRENT(now_time);
- now = INSTR_TIME_GET_MICROSEC(now_time);
- if (now >= next_report)
- {
- /* generate and show report */
- int64 count = 0,
- lats = 0,
- sqlats = 0,
- skipped = 0;
- int64 lags = thread->throttle_lag;
- int64 run = now - last_report;
- double tps,
- total_run,
- latency,
- sqlat,
- stdev,
- lag;
-
- for (i = 0; i < nstate; i++)
- {
- count += state[i].cnt;
- lats += state[i].txn_latencies;
- sqlats += state[i].txn_sqlats;
- }
-
- total_run = (now - thread_start) / 1000000.0;
- tps = 1000000.0 * (count - last_count) / run;
- latency = 0.001 * (lats - last_lats) / (count - last_count);
- sqlat = 1.0 * (sqlats - last_sqlats) / (count - last_count);
- stdev = 0.001 * sqrt(sqlat - 1000000.0 * latency * latency);
- lag = 0.001 * (lags - last_lags) / (count - last_count);
- skipped = thread->throttle_latency_skipped - last_skipped;
-
- fprintf(stderr,
- "progress %d: %.1f s, %.1f tps, "
- "lat %.3f ms stddev %.3f",
- thread->tid, total_run, tps, latency, stdev);
- if (throttle_delay)
- {
- fprintf(stderr, ", lag %.3f ms", lag);
- if (latency_limit)
- fprintf(stderr, ", skipped " INT64_FORMAT, skipped);
- }
- fprintf(stderr, "\n");
-
- last_count = count;
- last_lats = lats;
- last_sqlats = sqlats;
- last_lags = lags;
- last_report = now;
- last_skipped = thread->throttle_latency_skipped;
- next_report += (int64) progress *1000000;
- }
- }
-#else
/* progress report by thread 0 for all threads */
if (progress && thread->tid == 0)
{
@@ -3779,31 +3698,16 @@ threadRun(void *arg)
next_report += (int64) progress *1000000;
}
}
-#endif /* PTHREAD_FORK_EMULATION */
}
done:
INSTR_TIME_SET_CURRENT(start);
disconnect_all(state, nstate);
- result->xacts = 0;
- result->latencies = 0;
- result->sqlats = 0;
- for (i = 0; i < nstate; i++)
- {
- result->xacts += state[i].cnt;
- result->latencies += state[i].txn_latencies;
- result->sqlats += state[i].txn_sqlats;
- }
- result->throttle_lag = thread->throttle_lag;
- result->throttle_lag_max = thread->throttle_lag_max;
- result->throttle_latency_skipped = thread->throttle_latency_skipped;
- result->latency_late = thread->latency_late;
-
INSTR_TIME_SET_CURRENT(end);
- INSTR_TIME_ACCUM_DIFF(result->conn_time, end, start);
+ INSTR_TIME_ACCUM_DIFF(thread->conn_time, end, start);
if (logfile)
fclose(logfile);
- return result;
+ return NULL;
}
/*
@@ -3825,90 +3729,6 @@ setalarm(int seconds)
alarm(seconds);
}
-#ifndef ENABLE_THREAD_SAFETY
-
-/*
- * implements pthread using fork.
- */
-
-typedef struct fork_pthread
-{
- pid_t pid;
- int pipes[2];
-} fork_pthread;
-
-static int
-pthread_create(pthread_t *thread,
- pthread_attr_t *attr,
- void *(*start_routine) (void *),
- void *arg)
-{
- fork_pthread *th;
- void *ret;
- int rc;
-
- th = (fork_pthread *) pg_malloc(sizeof(fork_pthread));
- if (pipe(th->pipes) < 0)
- {
- free(th);
- return errno;
- }
-
- th->pid = fork();
- if (th->pid == -1) /* error */
- {
- free(th);
- return errno;
- }
- if (th->pid != 0) /* in parent process */
- {
- close(th->pipes[1]);
- *thread = th;
- return 0;
- }
-
- /* in child process */
- close(th->pipes[0]);
-
- /* set alarm again because the child does not inherit timers */
- if (duration > 0)
- setalarm(duration);
-
- ret = start_routine(arg);
- rc = write(th->pipes[1], ret, sizeof(TResult));
- (void) rc;
- close(th->pipes[1]);
- free(th);
- exit(0);
-}
-
-static int
-pthread_join(pthread_t th, void **thread_return)
-{
- int status;
-
- while (waitpid(th->pid, &status, 0) != th->pid)
- {
- if (errno != EINTR)
- return errno;
- }
-
- if (thread_return != NULL)
- {
- /* assume result is TResult */
- *thread_return = pg_malloc(sizeof(TResult));
- if (read(th->pipes[0], *thread_return, sizeof(TResult)) != sizeof(TResult))
- {
- free(*thread_return);
- *thread_return = NULL;
- }
- }
- close(th->pipes[0]);
-
- free(th);
- return 0;
-}
-#endif
#else /* WIN32 */
static VOID CALLBACK
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers