Tom Lane wrote:
> Heikki Linnakangas <heikki.linnakan...@enterprisedb.com> writes:
>> Before we sprinkle all the global variables it touches with that, let me
>> explain what I meant by dividing walreceiver code differently between
>> dynamically loaded module and backend code. Right now I have to go to
>> sleep, though, but I'll try to get back to during the weekend.
> 
> Yeah, nothing to be done till we get another buildfarm cycle anyway.

Ok, looks like you did that anyway, let's see if it fixed it. Thanks.

So what I'm playing with is to pull walreceiver back into the backend
executable. To avoid the link dependency, walreceiver doesn't access
libpq directly, but loads a module dynamically which implements this
interface:

bool walrcv_connect(char *conninfo, XLogRecPtr startpoint)

Establish connection to the primary, and starts streaming from 'startpoint'.
Returns true on success.

bool walrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int
*len)

Retrieve any WAL record available through the connection, blocking for
maximum of 'timeout' ms.

void walrcv_disconnect(void);

Disconnect.


This is the kind of API Greg Stark requested earlier
(http://archives.postgresql.org/message-id/407d949e0912220336u595a05e0x20bd91b9fbc08...@mail.gmail.com),
though I'm not planning to make it pluggable for 3rd party
implementations yet.

The module doesn't need to touch backend internals much at all, no
tinkering with shared memory for example, so I would feel much better
about moving that out of src/backend. Not sure where, though; it's not
an executable, so src/bin is hardly the right place, but I wouldn't want
to put it in contrib either, because it should still be built and
installed by default. So I'm inclined to still leave it in
src/backend/replication/

I've pushed that 'replication-dynmodule' branch in my git repo. The diff
is hard to read, because it mostly just moves code around, but I've
attached libpqwalreceiver.c here, which is the dynamic module part. You
can also browse the tree via the web interface
(http://git.postgresql.org/gitweb?p=users/heikki/postgres.git;a=tree;h=refs/heads/replication-dynmodule;hb=replication-dynmodule)

I like this division of labor much more than making the whole
walreceiver process a dynamically loaded module, so barring objections I
will review and test this more, and commit next week.

-- 
  Heikki Linnakangas
  EnterpriseDB   http://www.enterprisedb.com
/*-------------------------------------------------------------------------
 *
 * libpqwalreceiver.c
 *
 * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It
 * is the process in the standby server that takes charge of receiving
 * XLOG records from a primary server during streaming replication.
 *
 * When the startup process determines that it's time to start streaming,
 * it instructs postmaster to start walreceiver. Walreceiver first connects
 * connects to the primary server (it will be served by a walsender process
 * in the primary server), and then keeps receiving XLOG records and
 * writing them to the disk as long as the connection is alive. As XLOG
 * records are received and flushed to disk, it updates the
 * WalRcv->receivedUpTo variable in shared memory, to inform the startup
 * process of how far it can proceed with XLOG replay.
 *
 * Normal termination is by SIGTERM, which instructs the walreceiver to
 * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
 * process, the walreceiver will simply abort and exit on SIGQUIT. A close
 * of the connection and a FATAL error are treated not as a crash but as
 * normal operation.
 *
 * Walreceiver is a postmaster child process like others, but it's compiled
 * as a dynamic module to avoid linking libpq with the main server binary.
 *
 * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
 *
 *
 * IDENTIFICATION
 *	  $PostgreSQL$
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include <unistd.h>

#include "libpq-fe.h"
#include "access/xlog.h"
#include "miscadmin.h"
#include "replication/walreceiver.h"
#include "utils/builtins.h"

#ifdef HAVE_POLL_H
#include <poll.h>
#endif
#ifdef HAVE_SYS_POLL_H
#include <sys/poll.h>
#endif
#ifdef HAVE_SYS_SELECT_H
#include <sys/select.h>
#endif

PG_MODULE_MAGIC;

void		_PG_init(void);

/* streamConn is a PGconn object of a connection to walsender from walreceiver */
static PGconn *streamConn = NULL;
static bool justconnected = false;

/* Buffer for currently read records */
static char *recvBuf = NULL;

/* Prototypes for interface functions */
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
			  int *len);
static void libpqrcv_disconnect(void);

/* Prototypes for private functions */
static bool libpq_select(int timeout_ms);

/*
 * Module load callback
 */
void
_PG_init(void)
{
	walrcv_connect = libpqrcv_connect;
	walrcv_receive = libpqrcv_receive;
	walrcv_disconnect = libpqrcv_disconnect;
}

/*
 * Establish the connection to the primary server for XLOG streaming
 */
static bool
libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
{
	char		conninfo_repl[MAXCONNINFO + 14];
	char	   *primary_sysid;
	char		standby_sysid[32];
	TimeLineID	primary_tli;
	TimeLineID	standby_tli;
	PGresult   *res;
	char		cmd[64];

	Assert(startpoint.xlogid != 0 || startpoint.xrecoff != 0);

	/*
	 * Set up a connection for XLOG streaming
	 */
	snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);

	streamConn = PQconnectdb(conninfo_repl);
	if (PQstatus(streamConn) != CONNECTION_OK)
		ereport(ERROR,
				(errmsg("could not connect to the primary server : %s",
						PQerrorMessage(streamConn))));

	/*
	 * Get the system identifier and timeline ID as a DataRow message
	 * from the primary server.
	 */
	res = PQexec(streamConn, "IDENTIFY_SYSTEM");
	if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
		PQclear(res);
		ereport(ERROR,
				(errmsg("could not receive the SYSID and timeline ID from "
						"the primary server: %s",
						PQerrorMessage(streamConn))));
    }
	if (PQnfields(res) != 2 || PQntuples(res) != 1)
	{
		int ntuples = PQntuples(res);
		int nfields = PQnfields(res);
		PQclear(res);
		ereport(ERROR,
				(errmsg("invalid response from primary server"),
				 errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields",
						   ntuples, nfields)));
	}
	primary_sysid = PQgetvalue(res, 0, 0);
	primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);

	/*
	 * Confirm that the system identifier of the primary is the same
	 * as ours.
	 */
	snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
			 GetSystemIdentifier());
	if (strcmp(primary_sysid, standby_sysid) != 0)
	{
		PQclear(res);
		ereport(ERROR,
				(errmsg("system differs between the primary and standby"),
				 errdetail("the primary SYSID is %s, standby SYSID is %s",
						   primary_sysid, standby_sysid)));
	}

	/*
	 * Confirm that the current timeline of the primary is the same
	 * as the recovery target timeline.
	 */
	standby_tli = GetRecoveryTargetTLI();
	PQclear(res);
	if (primary_tli != standby_tli)
		ereport(ERROR,
				(errmsg("timeline %u of the primary does not match recovery target timeline %u",
						primary_tli, standby_tli)));
	ThisTimeLineID = primary_tli;

	/* Start streaming from the point requested by startup process */
	snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
			 startpoint.xlogid, startpoint.xrecoff);
	res = PQexec(streamConn, cmd);
	if (PQresultStatus(res) != PGRES_COPY_OUT)
		ereport(ERROR,
				(errmsg("could not start XLOG streaming: %s",
						PQerrorMessage(streamConn))));
	PQclear(res);

	justconnected = true;

	return true;
}

/*
 * Wait until we can read WAL stream, or timeout.
 *
 * Returns true if data has become available for reading, false if timed out
 * or interrupted by signal.
 *
 * This is based on pqSocketCheck.
 */
static bool
libpq_select(int timeout_ms)
{
	int	ret;

	Assert(streamConn != NULL);
	if (PQsocket(streamConn) < 0)
		ereport(ERROR,
				(errcode_for_socket_access(),
				 errmsg("socket not open")));

	/* We use poll(2) if available, otherwise select(2) */
	{
#ifdef HAVE_POLL
		struct pollfd input_fd;

		input_fd.fd = PQsocket(streamConn);
		input_fd.events = POLLIN | POLLERR;
		input_fd.revents = 0;

		ret = poll(&input_fd, 1, timeout_ms);
#else							/* !HAVE_POLL */

		fd_set		input_mask;
		struct timeval timeout;
		struct timeval *ptr_timeout;

		FD_ZERO(&input_mask);
		FD_SET(PQsocket(streamConn), &input_mask);

		if (timeout_ms < 0)
			ptr_timeout = NULL;
		else
		{
			timeout.tv_sec	= timeout_ms / 1000;
			timeout.tv_usec	= (timeout_ms % 1000) * 1000;
			ptr_timeout		= &timeout;
		}

		ret = select(PQsocket(streamConn) + 1, &input_mask,
					 NULL, NULL, ptr_timeout);
#endif   /* HAVE_POLL */
	}

	if (ret == 0 || (ret < 0 && errno == EINTR))
		return false;
	if (ret < 0)
		ereport(ERROR,
				(errcode_for_socket_access(),
				 errmsg("select() failed: %m")));
	return true;
}

/*
 * Clear our pid from shared memory at exit.
 */
static void
libpqrcv_disconnect(void)
{
	PQfinish(streamConn);
	justconnected = false;
}

/*
 * Receive any WAL records available from XLOG stream, blocking for
 * maximum of 'timeout' ms.
 *
 * Returns:
 *
 *   True if data was received. *recptr, *buffer and *len are set to
 *   the WAL location of the received data, buffer holding it, and length,
 *   respectively.
 *
 *   False if no data was available within timeout, or wait was interrupted
 *   by signal.
 *
 * The buffer returned is only valid until the next call of this function or
 * libpq_connect/disconnect.
 *
 * ereports on error.
 */
static bool
libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
{
	int			rawlen;

	if (recvBuf != NULL)
		PQfreemem(recvBuf);
	recvBuf = NULL;

	/*
	 * If the caller requested to block, wait for data to arrive. But if
	 * this is the first call after connecting, don't wait, because
	 * there might already be some data in libpq buffer that we haven't
	 * returned to caller.
	 */
	if (timeout > 0 && !justconnected)
	{
		if (!libpq_select(timeout))
			return false;

		if (PQconsumeInput(streamConn) == 0)
			ereport(ERROR,
					(errmsg("could not read xlog records: %s",
							PQerrorMessage(streamConn))));
	}
	justconnected = false;

	/* Receive CopyData message */
	rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
	if (rawlen == 0)	/* no records available yet, then return */
		return false;
	if (rawlen == -1)	/* end-of-streaming or error */
	{
		PGresult	*res;

		res = PQgetResult(streamConn);
		if (PQresultStatus(res) == PGRES_COMMAND_OK)
		{
			PQclear(res);
			ereport(ERROR,
					(errmsg("replication terminated by primary server")));
		}
		PQclear(res);
		ereport(ERROR,
				(errmsg("could not read xlog records: %s",
						PQerrorMessage(streamConn))));
	}
	if (rawlen < -1)
		ereport(ERROR,
				(errmsg("could not read xlog records: %s",
						PQerrorMessage(streamConn))));

	if (rawlen < sizeof(XLogRecPtr))
		ereport(ERROR,
				(errmsg("invalid WAL message received from primary")));

	/* Return received WAL records to caller */
	*recptr = *((XLogRecPtr *) recvBuf);
	*buffer = recvBuf + sizeof(XLogRecPtr);
	*len = rawlen - sizeof(XLogRecPtr);

	return true;
}
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to