On Sat, Jun 24, 2017 at 4:05 PM, Thomas Munro
<thomas.mu...@enterprisedb.com> wrote:
> On Fri, Jun 23, 2017 at 11:48 PM, Thomas Munro
> <thomas.mu...@enterprisedb.com> wrote:
>> Maybe it needs a better name.
>
> Ok, how about this: the feature could be called "synchronous replay".
> The new column in pg_stat_replication could be called sync_replay
> (like the other sync_XXX columns).  The GUCs could be called
> synchronous replay, synchronous_replay_max_lag and
> synchronous_replay_lease_time.  The language in log messages could
> refer to standbys "joining the synchronous replay set".

Feature hereby renamed that way.  It seems a lot more
self-explanatory.  Please see attached.

-- 
Thomas Munro
http://www.enterprisedb.com

Attachment: synchronous-replay-v1.patch
Description: Binary data

Attachment: test-synchronous-replay.sh
Description: Bourne shell script

/*
 * A simple test program to test performance and visibility with the
 * synchronous replay patch.
 *
 * Each test loop updates a row on the primary, and then optionally checks if
 * it can see that change immediately on a standby.  If you do this with
 * standard async replication, you should occasionally see an assertion fail
 * if run with --check (depending on the vaguaries of timing -- I can
 * reproduce this very reliably on my system).  If you do it with traditional
 * sync rep, it becomes a little bit less likely (but it's still reliably
 * reproducible on my system).  If you do it with traditional sync rep set up,
 * and "--synchronous-commit remote_apply" then it should no longer be
 * possible to trigger than assertion, but that mode doesn't handle failures.
 * If you do it with --synchronous-replay then you should not be able to
 * reproduce it, no matter which standby you connect to.  If you're using
 * --check and the standby gets dropped (perhaps because you
 * break/disconnect/pause it etc) you should never see that assertion fail
 * (= SELECT running but seeing stale data), instead you should see an error
 * when running the SELECT.
 *
 * Arguments:
 *
 *  --primary <connection-string>     how to connect to the primary
 *  --standby <connection-string>     how to connect to the standby to check
 *  --check                           check that the update is visible on standby
 *  --synchronous-replay              enable synchronous replay
 *  --synchronous-commit LEVEL        set synchronous_commit to LEVEL
 *  --two-phase-commit                enable two-phase commit
 *  --loops COUNT                     how many loops to run through
 *  --verbose                         chatter
 */

#include <libpq-fe.h>

#include <assert.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>

static double
get_hires_time(void)
{
	struct timeval tv;

	gettimeofday(&tv, NULL);
	return tv.tv_sec + tv.tv_usec / 1000000.0;
}

int
main(int argc, char *argv[])
{
	PGconn *primary;
	PGconn *standby;
	PGresult *result;
	int i;
	int loops = 10000;
	char buffer[1024];
	const char *synchronous_commit = "on";
	bool synchronous_replay = false;
	const char *primary_connstr = "dbname=postgres port=5432";
	const char *standby_connstr = "dbname=postgres port=5441";
	bool check_applied = false;
	bool verbose = false;
	bool two_phase_commit = false;
	double start_time;

	for (i = 1; i != argc; ++i)
	{
		bool more = (i < argc - 1);

		if (strcmp(argv[i], "--verbose") == 0)
			verbose = true;
		else if (strcmp(argv[i], "--check") == 0)
			check_applied = true;
		else if (strcmp(argv[i], "--synchronous-commit") == 0 && more)
			synchronous_commit = argv[++i];
		else if (strcmp(argv[i], "--synchronous-replay") == 0)
			synchronous_replay = true;
		else if (strcmp(argv[i], "--primary") == 0 && more)
			primary_connstr = argv[++i];
		else if (strcmp(argv[i], "--standby") == 0 && more)
			standby_connstr = argv[++i];
		else if (strcmp(argv[i], "--loops") == 0 && more)
			loops = atoi(argv[++i]);
		else if (strcmp(argv[i], "--two-phase-commit") == 0)
			two_phase_commit = true;
		else
		{
			fprintf(stderr, "bad argument\n");
			exit(1);
		}
	}

	primary = PQconnectdb(primary_connstr);
	assert(PQstatus(primary) == CONNECTION_OK);

	standby = PQconnectdb(standby_connstr);
	assert(PQstatus(standby) == CONNECTION_OK);

	snprintf(buffer, sizeof(buffer), "SET synchronous_commit = %s", synchronous_commit);
	result = PQexec(primary, buffer);
	assert(PQresultStatus(result) == PGRES_COMMAND_OK);
	PQclear(result);
	if (synchronous_replay)
	{
		snprintf(buffer, sizeof(buffer), "SET synchronous_replay = on");
		result = PQexec(primary, buffer);
		assert(PQresultStatus(result) == PGRES_COMMAND_OK);
		PQclear(result);
	}

	snprintf(buffer, sizeof(buffer), "SET synchronous_commit = %s", synchronous_commit);
	result = PQexec(standby, buffer);
	assert(PQresultStatus(result) == PGRES_COMMAND_OK);
	PQclear(result);
	if (synchronous_replay)
	{
		snprintf(buffer, sizeof(buffer), "SET synchronous_replay = on");
		result = PQexec(standby, buffer);
		assert(PQresultStatus(result) == PGRES_COMMAND_OK);
		PQclear(result);
	}

	result = PQexec(primary, "CREATE TABLE counter AS SELECT 0 AS n");
	assert(PQresultStatus(result) == PGRES_COMMAND_OK ||
		 strcmp(PQresultErrorField(result, PG_DIAG_SQLSTATE), "42P07") == 0);
	PQclear(result);

	start_time = get_hires_time();

	for (i = 0; i < loops; ++i)
	{
		if (verbose)
			printf("Updating primary...\n");
		if (two_phase_commit)
		{
			result = PQexec(primary, "BEGIN");
			assert(PQresultStatus(result) == PGRES_COMMAND_OK);
			PQclear(result);
		}
		snprintf(buffer, sizeof(buffer), "UPDATE counter SET n = %d", i);
		result = PQexec(primary, buffer);
		assert(PQresultStatus(result) == PGRES_COMMAND_OK);
		PQclear(result);
		if (two_phase_commit)
		{
			snprintf(buffer, sizeof(buffer), "PREPARE TRANSACTION 'tx%d'", i);
			result = PQexec(primary, buffer);
			assert(PQresultStatus(result) == PGRES_COMMAND_OK);
			PQclear(result);
			snprintf(buffer, sizeof(buffer), "COMMIT PREPARED 'tx%d'", i);
			result = PQexec(primary, buffer);
			assert(PQresultStatus(result) == PGRES_COMMAND_OK);
			PQclear(result);
		}

		if (check_applied)
		{
			if (verbose)
				printf("Checking standby...\n");
			snprintf(buffer, sizeof(buffer), "SELECT n FROM counter");
			result = PQexec(standby, buffer);
			assert(PQresultStatus(result) == PGRES_TUPLES_OK);
			assert(PQntuples(result) == 1);
			assert(atoi(PQgetvalue(result, 0, 0)) == i);
			PQclear(result);
		}
	}

	printf("%0.3lf TPS\n", loops / (get_hires_time() - start_time));
	exit(0);
}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to