Well, good news all round. v17 implements what I believe to be the final set of features for sync rep. This one I'm actually fairly happy with. It can be enjoyed best at DEBUG3.
The patch is very lite touch on a few areas of code, plus a chunk of specific code, all on master-side. Pretty straight really. I'm sure problems will be found, its not long since I completed this; thanks to Daniel Farina for your help with patch assembly. Which is just as well, because the other news is that I'm off on holiday for a few days, which is most inconvenient. I won't be committing this for at least a week and absent from the list. OTOH, I think its ready for a final review and commit, so I'm OK if you commit or OK if you leave it for me. That's not the end of it. I can see a few things we could/should do in this release, but this includes all the must-do things. Docs could do with a little love also. So I expect work for me when I return. Config Summary ============== Most parameters are set on the primary. Set primary: synchronous_standby_names = 'node1, node2, node3' which means that whichever of those standbys connect first will become the main synchronous standby. Servers arriving later will be potential standbys (standby standbys doesn't sound right...). synchronous_standby_names can change at reload. Currently, the standby_name is the application_name parameter set in the primary_conninfo. When we set this for a client, or in postgresql.conf primary: synchronous_replication = on then we will wait at commit until the synchronous standby has reached the WAL location of our commit point. If the current synchronous standby dies then one of the other standbys will take over. (I think it would be a great idea to make the list a priority order, but I haven't had time to code that). If none of the standbys are available, then we don't wait at all if allow_standalone_primary is set. allow_standalone_primary can change at reload. Whatever happens, if you set sync_replication_timeout_client then backends will give up waiting if some WALsender doesn't wake them quickly enough. You can generally leave these parameters at their default settings primary: sync_replication_timeout_client = 120s primary: allow_standalone_primary = on standby: wal_receiver_status_interval = 10s -- Simon Riggs http://www.2ndQuadrant.com/books/ PostgreSQL Development, 24x7 Support, Training and Services
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index cee09c7..aad9b4e 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -2010,8 +2010,117 @@ SET ENABLE_SEQSCAN TO OFF; You should also consider setting <varname>hot_standby_feedback</> as an alternative to using this parameter. </para> + + <sect2 id="runtime-config-sync-rep"> + <title>Synchronous Replication</title> + + <para> + These settings control the behavior of the built-in + <firstterm>synchronous replication</> feature. + These parameters would be set on the primary server that is + to send replication data to one or more standby servers. + </para> + + <variablelist> + <varlistentry id="guc-synchronous-replication" xreflabel="synchronous_replication"> + <term><varname>synchronous_replication</varname> (<type>boolean</type>)</term> + <indexterm> + <primary><varname>synchronous_replication</> configuration parameter</primary> + </indexterm> + <listitem> + <para> + Specifies whether transaction commit will wait for WAL records + to be replicated before the command returns a <quote>success</> + indication to the client. The default setting is <literal>off</>. + When <literal>on</>, there will be a delay while the client waits + for confirmation of successful replication. That delay will + increase depending upon the physical distance and network activity + between primary and standby. The commit wait will last until the + first reply from any standby. Multiple standby servers allow + increased availability and possibly increase performance as well. + </para> + + <para> + On the primary, this parameter can be changed at any time; the + behavior for any one transaction is determined by the setting in + effect when it commits. It is therefore possible, and useful, to have + some transactions replicate synchronously and others asynchronously. + For example, to make a single multistatement transaction commit + asynchronously when the default is synchronous replication, issue + <command>SET LOCAL synchronous_replication TO OFF</> within the + transaction. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-sync-replication-timeout-client" xreflabel="sync_replication_timeout_client"> + <term><varname>sync_replication_timeout_client</varname> (<type>integer</type>)</term> + <indexterm> + <primary><varname>sync_replication_timeout_client</> configuration parameter</primary> + </indexterm> + <listitem> + <para> + If the client has <varname>synchronous_replication</varname> set, + and a synchronous standby is currently available + then the commit will wait for up to <varname>replication_timeout_client</> + seconds before it returns a <quote>success</>. The commit will wait + forever for a confirmation when <varname>replication_timeout_client</> + is set to -1. + </para> + <para> + If the client has <varname>synchronous_replication</varname> set, + and yet no synchronous standby is available when we commit, then the + setting of <varname>allow_standalone_primary</> determines whether + or not we wait. + </para> + </listitem> + </varlistentry> + + <varlistentry id="guc-allow-standalone-primary" xreflabel="allow_standalone_primary"> + <term><varname>allow_standalone_primary</varname> (<type>boolean</type>)</term> + <indexterm> + <primary><varname>allow_standalone_primary</> configuration parameter</primary> + </indexterm> + <listitem> + <para> + If <varname>allow_standalone_primary</> is set, then the server + can operate normally whether or not replication is active. If + a client requests <varname>synchronous_replication</> and it is + not available, they will use asynchornous replication instead. + </para> + <para> + </para> </listitem> </varlistentry> + + <varlistentry id="guc-sync-standby-names" xreflabel="synchronous_standby_names"> + <term><varname>synchronous_standby_names</varname> (<type>integer</type>)</term> + <indexterm> + <primary><varname>synchronous_standby_names</> configuration parameter</primary> + </indexterm> + <listitem> + <para> + Specifies a list of standby names that can become the sole + synchronous standby. Other standby servers connect that are also on + the list become potential standbys. If the current synchronous standby + goes away it will be replaced with one of the potential standbys. + Specifying more than one standby name can allow very high availability. + </para> + <para> + The standby name is currently taken as the application_name of the + standby, as set in the primary_conninfo on the standby. Names are + not enforced for uniqueness, though clearly that can lead to + confusion. Specifying multiple standbys with the same name does not + allow more than one standby to be the current synchronous standby. + </para> + <para> + If a standby is removed from the list of servers then it will stop + being the synchronous standby, allowing another to take it's place. + Standbys may also be added to the list without restarting the server. + </para> + </listitem> + </varlistentry> + </variablelist> </sect2> diff --git a/doc/src/sgml/high-availability.sgml b/doc/src/sgml/high-availability.sgml index 37ba43b..d2710dd 100644 --- a/doc/src/sgml/high-availability.sgml +++ b/doc/src/sgml/high-availability.sgml @@ -875,6 +875,209 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass' </sect3> </sect2> + <sect2 id="synchronous-replication"> + <title>Synchronous Replication</title> + + <indexterm zone="high-availability"> + <primary>Synchronous Replication</primary> + </indexterm> + + <para> + <productname>PostgreSQL</> streaming replication is asynchronous by + default. If the primary server + crashes then some transactions that were committed may not have been + replicated to the standby server, causing data loss. The amount + of data loss is proportional to the replication delay at the time of + failover. + </para> + + <para> + Synchronous replication offers the ability to confirm that all changes + made by a transaction have been transferred to one synchronous standby + server. This extends the standard level of durability + offered by a transaction commit. This level of protection is referred + to as 2-safe replication in computer science theory. + </para> + + <para> + Synchronous replication works in the following way. When requested, + the commit of a write transaction will wait until confirmation is + received that the commit has been written to the transaction log on disk + of both the primary and standby server. The only possibility that data + can be lost is if both the primary and the standby suffer crashes at the + same time. This can provide a much higher level of durability if the + sysadmin is cautious about the placement and management of the two servers. + Waiting for confirmation increases the user's confidence that the changes + will not be lost in the event of server crashes but it also necessarily + increases the response time for the requesting transaction. The minimum + wait time is the roundtrip time between primary to standby. + </para> + + <para> + Read only transactions and transaction rollbacks need not wait for + replies from standby servers. Subtransaction commits do not wait for + responses from standby servers, only top-level commits. Long + running actions such as data loading or index building do not wait + until the very final commit message. + </para> + + <sect3 id="synchronous-replication-config"> + <title>Basic Configuration</title> + + <para> + Synchronous replication will be active if appropriate options are + enabled on both the primary and at least one standby server. + </para> + + <para> + On the primary server we need to set + +<programlisting> +synchronous_standby_names = 'bill, ted' +synchronous_replication = on +</programlisting> + + and on the standby server we need to set a non-zero value for + +<programlisting> +wal_receiver_status_interval = 10s +</programlisting> + + On the primary, <varname>synchronous_replication</> can be set + for particular users or databases, or dynamically by applications + programs. On the standby, <varname>synchronous_replication_feedback</> + can only be set at server start. + </para> + + </sect3> + + <sect3 id="synchronous-replication-performance"> + <title>Planning for Performance</title> + + <para> + Synchronous replication usually requires carefully planned and placed + standby servers to ensure applications perform acceptably. Waiting + doesn't utilise system resources, but transaction locks continue to be + held until the transfer is confirmed. As a result, incautious use of + synchronous replication will reduce performance for database + applications because of increased response times and higher contention. + </para> + + <para> + <productname>PostgreSQL</> allows the application developer + to specify the durability level required via replication. This can be + specified for the system overall, though it can also be specified for + specific users or connections, or even individual transactions. + </para> + + <para> + For example, an application workload might consist of: + 10% of changes are important customer details, while + 90% of changes are less important data that the business can more + easily survive if it is lost, such as chat messages between users. + </para> + + <para> + With synchronous replication options specified at the application level + (on the primary) we can offer sync rep for the most important changes, + without slowing down the bulk of the total workload. Application level + options are an important and practical tool for allowing the benefits of + synchronous replication for high performance applications. + </para> + + <para> + You should consider that the network bandwidth must be higher than + the rate of generation of WAL data. + 10% of changes are important customer details, while + 90% of changes are less important data that the business can more + easily survive if it is lost, such as chat messages between users. + </para> + + </sect3> + + <sect3 id="synchronous-replication-ha"> + <title>Planning for High Availability</title> + + <para> + The easiest and safest method of gaining High Availability using + synchronous replication is to configure at least two standby servers. + To understand why, we need to examine what can happen when you lose all + standby servers. + </para> + + <para> + Commits made when synchronous_replication is set will wait until at + least one standby responds. The response may never occur if the last, + or only, standby should crash or the network drops. What should we do in + that situation? + </para> + + <para> + Sitting and waiting will typically cause operational problems + because it is an effective outage of the primary server should all + sessions end up waiting. In contrast, allowing the primary server to + continue processing write transactions in the absence of a standby + puts those latest data changes at risk. So in this situation there + is a direct choice between database availability and the potential + durability of the data it contains. How we handle this situation + is controlled by <varname>allow_standalone_primary</>. The default + setting is <literal>on</>, allowing processing to continue, though + there is no recommended setting. Choosing the best setting for + <varname>allow_standalone_primary</> is a difficult decision and best + left to those with combined business responsibility for both data and + applications. The difficulty of this choice is the reason why we + recommend that you reduce the possibility of this situation occurring + by using multiple standby servers. + </para> + + <para> + A user will stop waiting once the <varname>replication_timeout_client</> + has been reached for their specific session. Users are not waiting for + a specific standby to reply, they are waiting for a reply from any + standby, so the unavailability of any one standby is not significant + to a user. It is possible for user sessions to hit timeout even though + standbys are communicating normally. In that case, the setting of + <varname>replication_timeout</> is probably too low. + </para> + + <para> + When a standby first attaches to the primary, it may not be properly + synchronized. The standby is only able to become a synchronous standby + once it has become synchronized, or "caught up" with the the primary. + The catch-up duration may be long immediately after the standby has + been created. If the standby is shutdown, then the catch-up period + will increase according to the length of time the standby has been + down. You are advised to make sure <varname>allow_standalone_primary</> + is not set during the initial catch-up period. + </para> + + <para> + If primary crashes while commits are waiting for acknowledgement, those + transactions will be marked fully committed if the primary database + recovers, no matter how <varname>allow_standalone_primary</> is set. + There is no way to be certain that all standbys have received all + outstanding WAL data at time of the crash of the primary. Some + transactions may not show as committed on the standby, even though + they show as committed on the primary. The guarantee we offer is that + the application will not receive explicit acknowledgement of the + successful commit of a transaction until the WAL data is known to be + safely received by the standby. Hence this mechanism is technically + "semi synchronous" rather than "fully synchronous" replication. Note + that replication still not be fully synchronous even if we wait for + all standby servers, though this would reduce availability, as + described previously. + </para> + + <para> + If you need to re-create a standby server while transactions are + waiting, make sure that the commands to run pg_start_backup() and + pg_stop_backup() are run in a session with + synchronous_replication = off, otherwise those requests will wait + forever for the standby to appear. + </para> + + </sect3> + </sect2> </sect1> <sect1 id="warm-standby-failover"> diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index 287ad26..eb3cd6f 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -56,6 +56,7 @@ #include "pg_trace.h" #include "pgstat.h" #include "replication/walsender.h" +#include "replication/syncrep.h" #include "storage/fd.h" #include "storage/predicate.h" #include "storage/procarray.h" @@ -2030,6 +2031,14 @@ RecordTransactionCommitPrepared(TransactionId xid, MyProc->inCommit = false; END_CRIT_SECTION(); + + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(recptr); } /* diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index a0170b4..5f73226 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -37,6 +37,7 @@ #include "miscadmin.h" #include "pgstat.h" #include "replication/walsender.h" +#include "replication/syncrep.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/lmgr.h" @@ -1055,7 +1056,7 @@ RecordTransactionCommit(void) * if all to-be-deleted tables are temporary though, since they are lost * anyway if we crash.) */ - if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0) + if ((wrote_xlog && XactSyncCommit) || forceSyncCommit || nrels > 0 || SyncRepRequested()) { /* * Synchronous commit case: @@ -1125,6 +1126,14 @@ RecordTransactionCommit(void) /* Compute latestXid while we have the child XIDs handy */ latestXid = TransactionIdLatest(xid, nchildren, children); + /* + * Wait for synchronous replication, if required. + * + * Note that at this stage we have marked clog, but still show as + * running in the procarray and continue to hold locks. + */ + SyncRepWaitForLSN(XactLastRecEnd); + /* Reset XactLastRecEnd until the next transaction writes something */ XactLastRecEnd.xrecoff = 0; diff --git a/src/backend/postmaster/autovacuum.c b/src/backend/postmaster/autovacuum.c index 7307c41..2171b50 100644 --- a/src/backend/postmaster/autovacuum.c +++ b/src/backend/postmaster/autovacuum.c @@ -1527,6 +1527,14 @@ AutoVacWorkerMain(int argc, char *argv[]) SetConfigOption("statement_timeout", "0", PGC_SUSET, PGC_S_OVERRIDE); /* + * Force synchronous replication off to allow regular maintenance even + * if we are waiting for standbys to connect. This is important to + * ensure we aren't blocked from performing anti-wraparound tasks + * when allow_standalone_primary = false + */ + SetConfigOption("synchronous_replication", "off", PGC_SUSET, PGC_S_OVERRIDE); + + /* * Get the info about the database we're going to work on. */ LWLockAcquire(AutovacuumLock, LW_EXCLUSIVE); diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 42c6eaf..3fe490e 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -13,7 +13,7 @@ top_builddir = ../../.. include $(top_builddir)/src/Makefile.global OBJS = walsender.o walreceiverfuncs.o walreceiver.o basebackup.o \ - repl_gram.o + repl_gram.o syncrep.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c new file mode 100644 index 0000000..14f426b --- /dev/null +++ b/src/backend/replication/syncrep.c @@ -0,0 +1,650 @@ +/*------------------------------------------------------------------------- + * + * syncrep.c + * + * Synchronous replication is new as of PostgreSQL 9.1. + * + * If requested, transaction commits wait until their commit LSN is + * acknowledged by the standby, or the wait hits timeout. + * + * This module contains the code for waiting and release of backends. + * All code in this module executes on the primary. The core streaming + * replication transport remains within WALreceiver/WALsender modules. + * + * The essence of this design is that it isolates all logic about + * waiting/releasing onto the primary. The primary defines which standbys + * it wishes to wait for. The standby is completely unaware of the + * durability requirements of transactions on the primary, reducing the + * complexity of the code and streamlining both standby operations and + * network bandwidth because there is no requirement to ship + * per-transaction state information. + * + * The bookeeping approach we take is that a commit is either synchronous + * or not synchronous (async). If it is async, we just fastpath out of + * here. If it is sync, then in 9.1 we wait for the flush location on the + * standby before releasing the waiting backend. Further complexity + * in that interaction is expected in later releases. + * + * The best performing way to manage the waiting backends is to have a + * single ordered queue of waiting backends, so that we can avoid + * searching the through all waiters each time we receive a reply. + * + * Starting sync replication is a multi stage process. First, the standby + * must be a potential synchronous standby. Next, we must have caught up + * with the primary; that may take some time. If there is no current + * synchronous standby then the WALsender will offer a sync rep service. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * IDENTIFICATION + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <unistd.h> + +#include "access/xact.h" +#include "access/xlog_internal.h" +#include "miscadmin.h" +#include "postmaster/autovacuum.h" +#include "replication/syncrep.h" +#include "replication/walsender.h" +#include "storage/latch.h" +#include "storage/ipc.h" +#include "storage/pmsignal.h" +#include "storage/proc.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/guc_tables.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" + +/* User-settable parameters for sync rep */ +bool sync_rep_mode = false; /* Only set in user backends */ +int sync_rep_timeout_client = 120; /* Only set in user backends */ +bool allow_standalone_primary; +char *SyncRepStandbyNames; + + +#define IsOnSyncRepQueue() (MyProc->lwWaiting) + +static void SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN); +static void SyncRepRemoveFromQueue(void); +static void SyncRepAddToQueue(void); +static bool SyncRepServiceAvailable(void); +static long SyncRepGetWaitTimeout(void); + +static bool IsPotentialSyncRepStandby(void); +static int SyncRepWakeQueue(void); + + +/* + * =========================================================== + * Synchronous Replication functions for normal user backends + * =========================================================== + */ + +/* + * Wait for synchronous replication, if requested by user. + */ +extern void +SyncRepWaitForLSN(XLogRecPtr XactCommitLSN) +{ + /* + * Fast exit if user has not requested sync replication, or + * streaming replication is inactive in this server. + */ + if (!SyncRepRequested() || max_wal_senders == 0) + return; + + if (allow_standalone_primary) + { + /* + * Check that the service level we want is available. + * If not, downgrade the service level to async. + */ + if (SyncRepServiceAvailable()) + SyncRepWaitOnQueue(XactCommitLSN); + } + else + { + /* + * Wait, even if service is not yet available. + * Sounds weird, but this mode exists to provide + * higher levels of protection. + */ + SyncRepWaitOnQueue(XactCommitLSN); + } +} + +/* + * Wait for specified LSN to be confirmed at the requested level + * of durability. Each proc has its own wait latch, so we perform + * a normal latch check/wait loop here. + */ +static void +SyncRepWaitOnQueue(XLogRecPtr XactCommitLSN) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + TimestampTz now = GetCurrentTransactionStopTimestamp(); + long timeout = SyncRepGetWaitTimeout(); + char *new_status = NULL; + const char *old_status; + int len; + + ereport(DEBUG3, + (errmsg("synchronous replication waiting for %X/%X starting at %s", + XactCommitLSN.xlogid, + XactCommitLSN.xrecoff, + timestamptz_to_str(GetCurrentTransactionStopTimestamp())))); + + for (;;) + { + ResetLatch(&MyProc->waitLatch); + + /* + * First time through, add ourselves to the appropriate queue. + */ + if (!IsOnSyncRepQueue()) + { + SpinLockAcquire(&queue->qlock); + if (XLByteLE(XactCommitLSN, queue->lsn)) + { + /* No need to wait */ + SpinLockRelease(&queue->qlock); + return; + } + + /* + * Set our waitLSN so WALSender will know when to wake us. + * We set this before we add ourselves to queue, so that + * any proc on the queue can be examined freely without + * taking a lock on each process in the queue. + */ + MyProc->waitLSN = XactCommitLSN; + SyncRepAddToQueue(); + SpinLockRelease(&queue->qlock); + + /* + * Alter ps display to show waiting for sync rep. + */ + old_status = get_ps_display(&len); + new_status = (char *) palloc(len + 21 + 1); + memcpy(new_status, old_status, len); + strcpy(new_status + len, " waiting for sync rep"); + set_ps_display(new_status, false); + new_status[len] = '\0'; /* truncate off " waiting" */ + } + else + { + bool release = false; + bool timeout = false; + + SpinLockAcquire(&queue->qlock); + + /* + * Check the LSN on our queue and if it's moved far enough then + * remove us from the queue. First time through this is + * unlikely to be far enough, yet is possible. Next time we are + * woken we should be more lucky. + */ + if (XLByteLE(XactCommitLSN, queue->lsn)) + release = true; + else if (timeout > 0 && + TimestampDifferenceExceeds(GetCurrentTransactionStopTimestamp(), + now, timeout)) + { + release = true; + timeout = true; + } + + if (release) + { + SyncRepRemoveFromQueue(); + SpinLockRelease(&queue->qlock); + + if (new_status) + { + /* Reset ps display */ + set_ps_display(new_status, false); + pfree(new_status); + } + + /* + * Our response to the timeout is to simply post a NOTICE and + * then return to the user. The commit has happened, we just + * haven't been able to verify it has been replicated in the + * way requested. + */ + if (timeout) + ereport(NOTICE, + (errmsg("synchronous replication timeout at %s", + timestamptz_to_str(now)))); + else + ereport(DEBUG3, + (errmsg("synchronous replication wait complete at %s", + timestamptz_to_str(now)))); + return; + } + + SpinLockRelease(&queue->qlock); + } + + WaitLatch(&MyProc->waitLatch, timeout); + now = GetCurrentTimestamp(); + } +} + +/* + * Remove myself from sync rep wait queue. + * + * Assume on queue at start; will not be on queue at end. + * Queue is already locked at start and remains locked on exit. + */ +void +SyncRepRemoveFromQueue(void) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + PGPROC *proc = queue->head; + + Assert(IsOnSyncRepQueue()); + + proc = queue->head; + + if (proc == MyProc) + { + if (MyProc->lwWaitLink == NULL) + { + /* + * We were the only waiter on the queue. Reset head and tail. + */ + Assert(queue->tail == MyProc); + queue->head = NULL; + queue->tail = NULL; + } + else + /* + * Move head to next proc on the queue. + */ + queue->head = MyProc->lwWaitLink; + } + else + { + while (proc->lwWaitLink != NULL) + { + /* Are we the next proc in our traversal of the queue? */ + if (proc->lwWaitLink == MyProc) + { + /* + * Remove ourselves from middle of queue. + * No need to touch head or tail. + */ + proc->lwWaitLink = MyProc->lwWaitLink; + } + + if (proc->lwWaitLink == NULL) + elog(WARNING, "could not locate ourselves on wait queue"); + proc = proc->lwWaitLink; + } + + if (proc->lwWaitLink == NULL) /* At tail */ + { + Assert(proc == MyProc); + /* Remove ourselves from tail of queue */ + Assert(queue->tail == MyProc); + queue->tail = proc; + proc->lwWaitLink = NULL; + } + } + MyProc->lwWaitLink = NULL; + MyProc->lwWaiting = false; +} + +/* + * Add myself to sync rep wait queue. + * + * Assume not on queue at start; will be on queue at end. + * Queue is already locked at start and remains locked on exit. + */ +static void +SyncRepAddToQueue(void) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + PGPROC *tail = queue->tail; + + /* + * Add myself to tail of wait queue. + */ + if (tail == NULL) + { + queue->head = MyProc; + queue->tail = MyProc; + } + else + { + /* + * XXX extra code needed here to maintain sorted invariant. + * Our approach should be same as racing car - slow in, fast out. + */ + Assert(tail->lwWaitLink == NULL); + tail->lwWaitLink = MyProc; + } + queue->tail = MyProc; + + MyProc->lwWaiting = true; + MyProc->lwWaitLink = NULL; +} + +/* + * Dynamically decide the sync rep wait mode. It may seem a trifle + * wasteful to do this for every transaction but we need to do this + * so we can cope sensibly with standby disconnections. It's OK to + * spend a few cycles here anyway, since while we're doing this the + * WALSender will be sending the data we want to wait for, so this + * is dead time and the user has requested to wait anyway. + */ +static bool +SyncRepServiceAvailable(void) +{ + bool result = false; + + SpinLockAcquire(&WalSndCtl->ctlmutex); + result = WalSndCtl->sync_rep_service_available; + SpinLockRelease(&WalSndCtl->ctlmutex); + + return result; +} + +/* + * Return a value that we can use directly in WaitLatch(). We need to + * handle special values, plus convert from seconds to microseconds. + * + */ +static long +SyncRepGetWaitTimeout(void) +{ + if (sync_rep_timeout_client <= 0) + return -1L; + + return 1000000L * sync_rep_timeout_client; +} + +void +SyncRepCleanupAtProcExit(int code, Datum arg) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + + if (IsOnSyncRepQueue()) + { + SpinLockAcquire(&queue->qlock); + SyncRepRemoveFromQueue(); + SpinLockRelease(&queue->qlock); + } + + if (MyProc != NULL && MyProc->ownLatch) + { + DisownLatch(&MyProc->waitLatch); + MyProc->ownLatch = false; + } +} + +/* + * =========================================================== + * Synchronous Replication functions for wal sender processes + * =========================================================== + */ + +/* + * Check if we are in the list of sync standbys. + * + * Compare the parameter SyncRepStandbyNames against the application_name + * for this WALSender. + */ +static bool +IsPotentialSyncRepStandby(void) +{ + char *rawstring; + List *elemlist; + ListCell *l; + + /* Need a modifiable copy of string */ + rawstring = pstrdup(SyncRepStandbyNames); + + /* Parse string into list of identifiers */ + if (!SplitIdentifierString(rawstring, ',', &elemlist)) + { + /* syntax error in list */ + pfree(rawstring); + list_free(elemlist); + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid list syntax for parameter \"synchronous_standby_names\""))); + return false; + } + + foreach(l, elemlist) + { + char *standby_name = (char *) lfirst(l); + + if (pg_strcasecmp(standby_name, application_name) == 0) + return true; + } + + return false; +} + +/* + * Take any action required to initialise sync rep state from config + * data. Called at WALSender startup and after each SIGHUP. + */ +void +SyncRepInitConfig(void) +{ + bool sync_standby = IsPotentialSyncRepStandby(); + + /* + * Determine if we are a potential sync standby and remember the result + * for handling replies from standby. + */ + if (!MyWalSnd->potential_sync_standby && sync_standby) + { + MyWalSnd->potential_sync_standby = true; + ereport(DEBUG1, + (errmsg("standby \"%s\" is a potential synchronous standby", + application_name))); + } + else if (MyWalSnd->potential_sync_standby && !sync_standby) + { + /* + * We're no longer a potential sync standby. + */ + MyWalSnd->potential_sync_standby = false; + + /* + * Stop providing the sync rep service, to let another take over. + */ + if (MyWalSnd->sync_rep_service) + { + /* + * Update state for this WAL sender. + */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sync_rep_service = false; + SpinLockRelease(&walsnd->mutex); + } + + /* + * Stop providing the sync rep service, even if there are + * waiting backends. + */ + { + SpinLockAcquire(&WalSndCtl->ctlmutex); + WalSndCtl->sync_rep_service_available = false; + SpinLockRelease(&WalSndCtl->ctlmutex); + } + + ereport(DEBUG1, + (errmsg("standby \"%s\" is no longer the synchronous replication standby", + application_name))); + } + } +} + +/* + * Update the LSNs on each queue based upon our latest state. This + * implements a simple policy of first-valid-standby-releases-waiter. + * + * Other policies are possible, which would change what we do here and what + * perhaps also which information we store as well. + */ +void +SyncRepReleaseWaiters(void) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + + /* + * If this WALSender is serving a standby that is not on the list of + * potential standbys then we have nothing to do. + */ + if (!MyWalSnd->potential_sync_standby) + return; + + /* + * We're a potential sync standby. If we aren't yet offering a sync + * rep service, check whether we need to begin offering that service. + * We check this dynamically to ensure that we can continue to offer + * a service if we have multiple potential standbys and the current + * sync standby fails. + * + * We don't attempt to enable sync rep service during a base backup since + * during that action we aren't sending WAL at all, so there cannot be + * any meaningful replies. We don't enable sync rep service while we + * are still in catchup mode either, since clients might experience an + * extended wait (perhaps hours) if they waited at that point. + */ + if (!MyWalSnd->sync_rep_service && + MyWalSnd->state == WALSNDSTATE_STREAMING) + { + if (SyncRepServiceAvailable()) + { + /* + * Another WALSender is already providing the sync rep service. + */ + return; + } + else + { + bool enable_service = false; + + /* + * We're a potential sync standby and there isn't currently + * a sync standby, so we're now going to become one. Watch for + * race conditions here. + */ + { + SpinLockAcquire(&WalSndCtl->ctlmutex); + if (!WalSndCtl->sync_rep_service_available) + { + WalSndCtl->sync_rep_service_available = true; + enable_service = true; + } + SpinLockRelease(&WalSndCtl->ctlmutex); + } + + /* + * Another WALSender just is already providing the sync rep service. + */ + if (!enable_service) + return; + + ereport(DEBUG1, + (errmsg("standby \"%s\" is now the synchronous replication standby", + application_name))); + + /* + * Update state for this WAL sender. + */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sync_rep_service = true; + SpinLockRelease(&walsnd->mutex); + } + } + } + + /* + * Maintain queue LSNs and release wakers. + */ + { + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + int numprocs = 0; + + /* + * Lock the queue. Not really necessary with just one sync standby + * but it makes clear what needs to happen. + */ + SpinLockAcquire(&queue->qlock); + if (XLByteLT(queue->lsn, MyWalSnd->flush)) + { + /* + * Set the lsn first so that when we wake backends they will + * release up to this location. + */ + queue->lsn = MyWalSnd->flush; + numprocs = SyncRepWakeQueue(); + } + SpinLockRelease(&queue->qlock); + + elog(DEBUG3, "released %d procs up to %X/%X", + numprocs, + MyWalSnd->flush.xlogid, + MyWalSnd->flush.xrecoff); + } +} + +/* + * Walk queue from head setting the latches of any procs that need + * to be woken. We don't modify the queue, we leave that for individual + * procs to release themselves. + * + * Must hold spinlock on queue. + */ +static int +SyncRepWakeQueue(void) +{ + volatile WalSndCtlData *walsndctl = WalSndCtl; + volatile SyncRepQueue *queue = &(walsndctl->sync_rep_queue); + PGPROC *proc = queue->head; + int numprocs = 0; + + /* fast exit for empty queue */ + if (proc == NULL) + return 0; + + for (; proc != NULL; proc = proc->lwWaitLink) + { + /* + * Assume the queue is ordered by LSN + */ + if (XLByteLT(queue->lsn, proc->waitLSN)) + return numprocs; + + numprocs++; + SetLatch(&proc->waitLatch); + } + + return numprocs; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index eb99246..cf39b1c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -66,7 +66,7 @@ WalSndCtlData *WalSndCtl = NULL; /* My slot in the shared memory array */ -static WalSnd *MyWalSnd = NULL; +WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ @@ -174,6 +174,8 @@ WalSenderMain(void) SpinLockRelease(&walsnd->mutex); } + SyncRepInitConfig(); + /* Main loop of walsender */ return WalSndLoop(); } @@ -379,6 +381,16 @@ StartReplication(StartReplicationCmd * cmd) */ WalSndSetState(WALSNDSTATE_CATCHUP); + /* + * When we first start replication the standby will be behind the primary. + * For some applications, for example, synchronous replication, it is + * important to have a clear state for this initial catchup mode, so we + * can trigger actions when we change streaming state later. We may stay + * in this state for a long time, which is exactly why we want to be + * able to monitor whether or not we are still here. + */ + WalSndSetState(WALSNDSTATE_CATCHUP); + /* Send a CopyBothResponse message, and start streaming */ pq_beginmessage(&buf, 'W'); pq_sendbyte(&buf, 0); @@ -584,6 +596,8 @@ ProcessStandbyReplyMessage(void) walsnd->apply = reply.apply; SpinLockRelease(&walsnd->mutex); } + + SyncRepReleaseWaiters(); } /* @@ -700,6 +714,7 @@ WalSndLoop(void) { got_SIGHUP = false; ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); } /* @@ -771,7 +786,12 @@ WalSndLoop(void) * that point might wait for some time. */ if (MyWalSnd->state == WALSNDSTATE_CATCHUP && caughtup) + { + ereport(DEBUG1, + (errmsg("standby \"%s\" has now caught up with primary", + application_name))); WalSndSetState(WALSNDSTATE_STREAMING); + } ProcessRepliesIfAny(); } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index afaf599..62d1d6b 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -39,6 +39,7 @@ #include "access/xact.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" +#include "replication/syncrep.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" @@ -196,6 +197,7 @@ InitProcGlobal(void) PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; ProcGlobal->freeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* @@ -214,6 +216,7 @@ InitProcGlobal(void) PGSemaphoreCreate(&(procs[i].sem)); procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; ProcGlobal->autovacFreeProcs = &procs[i]; + InitSharedLatch(&procs[i].waitLatch); } /* @@ -224,6 +227,7 @@ InitProcGlobal(void) { AuxiliaryProcs[i].pid = 0; /* marks auxiliary proc as not in use */ PGSemaphoreCreate(&(AuxiliaryProcs[i].sem)); + InitSharedLatch(&procs[i].waitLatch); } /* Create ProcStructLock spinlock, too */ @@ -326,6 +330,13 @@ InitProcess(void) SHMQueueInit(&(MyProc->myProcLocks[i])); MyProc->recoveryConflictPending = false; + /* Initialise the waitLSN for sync rep */ + MyProc->waitLSN.xlogid = 0; + MyProc->waitLSN.xrecoff = 0; + + OwnLatch((Latch *) &MyProc->waitLatch); + MyProc->ownLatch = true; + /* * We might be reusing a semaphore that belonged to a failed process. So * be careful and reinitialize its value here. (This is not strictly @@ -365,6 +376,7 @@ InitProcessPhase2(void) /* * Arrange to clean that up at backend exit. */ + on_shmem_exit(SyncRepCleanupAtProcExit, 0); on_shmem_exit(RemoveProcFromArray, 0); } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 55cbf75..16dced1 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -55,6 +55,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" +#include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" @@ -754,6 +755,14 @@ static struct config_bool ConfigureNamesBool[] = true, NULL, NULL }, { + {"synchronous_replication", PGC_USERSET, WAL_REPLICATION, + gettext_noop("Requests synchronous replication."), + NULL + }, + &sync_rep_mode, + false, NULL, NULL + }, + { {"zero_damaged_pages", PGC_SUSET, DEVELOPER_OPTIONS, gettext_noop("Continues processing past damaged page headers."), gettext_noop("Detection of a damaged page header normally causes PostgreSQL to " @@ -1270,6 +1279,16 @@ static struct config_bool ConfigureNamesBool[] = }, { + {"allow_standalone_primary", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("Allow users to proceed without waiting if they request" + "synchronous replication and it is not available."), + NULL + }, + &allow_standalone_primary, + true, NULL, NULL + }, + + { {"hot_standby", PGC_POSTMASTER, WAL_STANDBY_SERVERS, gettext_noop("Allows connections and queries during recovery."), NULL @@ -2161,6 +2180,16 @@ static struct config_int ConfigureNamesInt[] = }, { + {"sync_replication_timeout_client", PGC_USERSET, WAL_REPLICATION, + gettext_noop("Clients waiting for confirmation will timeout after this duration."), + NULL, + GUC_UNIT_S + }, + &sync_rep_timeout_client, + 120, -1, INT_MAX, NULL, NULL + }, + + { {"track_activity_query_size", PGC_POSTMASTER, RESOURCES_MEM, gettext_noop("Sets the size reserved for pg_stat_activity.current_query, in bytes."), NULL, @@ -2717,6 +2746,16 @@ static struct config_string ConfigureNamesString[] = }, { + {"synchronous_standby_names", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("List of potential standby names to synchronise with."), + NULL, + GUC_LIST_INPUT | GUC_IS_NAME + }, + &SyncRepStandbyNames, + "", NULL, NULL + }, + + { {"default_text_search_config", PGC_USERSET, CLIENT_CONN_LOCALE, gettext_noop("Sets default text search configuration."), NULL diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 6726733..314b36d 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -184,7 +184,15 @@ #archive_timeout = 0 # force a logfile segment switch after this # number of seconds; 0 disables -# - Streaming Replication - +# - Replication - User Settings + +#synchronous_replication = off # does commit wait for reply from standby +#replication_timeout_client = 120 # -1 means wait forever + +# - Streaming Replication - Server Settings + +#allow_standalone_primary = on # sync rep parameter +#synchronous_standby_names = '' # list of server names for sync rep #max_wal_senders = 0 # max number of walsender processes # (change requires restart) diff --git a/src/include/replication/syncrep.h b/src/include/replication/syncrep.h new file mode 100644 index 0000000..a789856 --- /dev/null +++ b/src/include/replication/syncrep.h @@ -0,0 +1,56 @@ +/*------------------------------------------------------------------------- + * + * syncrep.h + * Exports from replication/syncrep.c. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * $PostgreSQL$ + * + *------------------------------------------------------------------------- + */ +#ifndef _SYNCREP_H +#define _SYNCREP_H + +#include "access/xlog.h" +#include "storage/proc.h" +#include "storage/shmem.h" +#include "storage/spin.h" + +#define SyncRepRequested() (sync_rep_mode) + +/* + * Each synchronous rep queue lives in the WAL sender shmem area. + */ +typedef struct SyncRepQueue +{ + /* + * Current location of the head of the queue. All waiters should have + * a waitLSN that follows this value, or they are currently being woken + * to remove themselves from the queue. + */ + XLogRecPtr lsn; + + PGPROC *head; + PGPROC *tail; + + slock_t qlock; /* locks shared variables shown above */ +} SyncRepQueue; + +/* user-settable parameters for synchronous replication */ +extern bool sync_rep_mode; +extern int sync_rep_timeout_client; +extern bool allow_standalone_primary; +extern char *SyncRepStandbyNames; + +/* called by user backend */ +extern void SyncRepWaitForLSN(XLogRecPtr XactCommitLSN); + +/* called by wal sender */ +extern void SyncRepInitConfig(void); +extern void SyncRepReleaseWaiters(void); + +/* callback at exit */ +extern void SyncRepCleanupAtProcExit(int code, Datum arg); + +#endif /* _SYNCREP_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index 5843307..bd67622 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -15,6 +15,7 @@ #include "access/xlog.h" #include "nodes/nodes.h" #include "storage/latch.h" +#include "replication/syncrep.h" #include "storage/spin.h" @@ -52,11 +53,33 @@ typedef struct WalSnd * to do. */ Latch latch; + + /* + * Is this WALSender currently offering a sync replication service? + */ + bool sync_rep_service; + + /* + * Is this WALSender on the list of sync standbys? + */ + bool potential_sync_standby; } WalSnd; +extern WalSnd *MyWalSnd; + /* There is one WalSndCtl struct for the whole database cluster */ typedef struct { + /* + * Sync rep wait queue, which maintains the invariant that the + * individual queues are sorted on LSN. + */ + SyncRepQueue sync_rep_queue; + + bool sync_rep_service_available; + + slock_t ctlmutex; /* locks shared variables shown above */ + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ } WalSndCtlData; diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 78dbade..27b57c8 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -14,6 +14,8 @@ #ifndef _PROC_H_ #define _PROC_H_ +#include "access/xlog.h" +#include "storage/latch.h" #include "storage/lock.h" #include "storage/pg_sema.h" #include "utils/timestamp.h" @@ -115,6 +117,11 @@ struct PGPROC LOCKMASK heldLocks; /* bitmask for lock types already held on this * lock object by this backend */ + /* Info to allow us to wait for synchronous replication, if needed. */ + Latch waitLatch; + XLogRecPtr waitLSN; /* waiting for this LSN or higher */ + bool ownLatch; /* do we own the above latch? */ + /* * All PROCLOCK objects for locks held or awaited by this backend are * linked into one of these lists, according to the partition number of
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers