Hi hackers,
Few days earlier I've finished my work on WAITLSN statement utility, so
I’d like to share it.
Introduction
============
Our clients who deal with 9.5 and use asynchronous master-slave
replication, asked to make the wait-mechanism on the slave side to
prevent the situation when slave handles query which needs data (LSN)
that was received, flushed, but still not replayed.
Problem description
===================
The implementation:
Must handle the wait-mechanism using pg_sleep() in order not to load system
Must avoid race conditions if different backend want to wait for
different LSN
Must not take snapshot of DB, to avoid troubles with sudden minXID change
Must have optional timeout parameter if LSN traffic has stalled.
Must release on postmaster’s death or interrupts.
Implementation
==============
To avoid troubles with snapshots, WAITLSN was implemented as a utility
statement, this allows us to circumvent the snapshot-taking mechanism.
We tried different variants and the most effective way was to use Latches.
To handle interprocess interaction all Latches are stored in shared
memory and to cope with race conditions, each Latch is protected by a
Spinlock.
Timeout was made optional parameter, it is set in milliseconds.
What works
==========
Actually, it works well even with significant timeout or wait period
values, but of course there might be things I've overlooked.
How to use it
==========
WAITLSN ‘LSN’ [, timeout in ms];
#Wait until LSN 0/303EC60 will be replayed, or 10 second passed.
WAITLSN ‘0/303EC60’, 10000;
#Or same without timeout.
WAITLSN ‘0/303EC60’;
Notice: WAITLSN will release on PostmasterDeath or Interruption events
if they come earlier then LSN or timeout.
Testing the implementation
======================
The implementation was tested with testgres and unittest python modules.
How to test this implementation:
Start master server
Make table test, insert tuple 1
Make asynchronous slave replication (9.5 wal_level = standby, 9.6 or
higher wal_level = replica)
Slave: START TRANSACTION ISOLATION LEVEL REPEATABLE READ ;
SELECT * FROM test;
Master: delete tuple + make vacuum + get new LSN
Slave: WAITLSN ‘newLSN’, 60000;
Waitlsn finished with FALSE “LSN doesn`t reached”
Slave: COMMIT;
WAITLSN ‘newLSN’, 60000;
Waitlsn finished with success (without NOTICE message)
The WAITLSN as expected wait LSN, and interrupts on PostmasterDeath,
interrupts or timeout.
Your feedback is welcome!
---
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company
diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 77667bd..72c5390 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory.
<!ENTITY update SYSTEM "update.sgml">
<!ENTITY vacuum SYSTEM "vacuum.sgml">
<!ENTITY values SYSTEM "values.sgml">
+<!ENTITY waitlsn SYSTEM "waitlsn.sgml">
<!-- applications and utilities -->
<!ENTITY clusterdb SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/waitlsn.sgml b/doc/src/sgml/ref/waitlsn.sgml
new file mode 100644
index 0000000..6a8bdca
--- /dev/null
+++ b/doc/src/sgml/ref/waitlsn.sgml
@@ -0,0 +1,108 @@
+<!--
+doc/src/sgml/ref/waitlsn.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="SQL-WAITLSN">
+ <indexterm zone="sql-waitlsn">
+ <primary>WAITLSN</primary>
+ </indexterm>
+
+ <refmeta>
+ <refentrytitle>WAITLSN</refentrytitle>
+ <manvolnum>7</manvolnum>
+ <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+ <refname>WAITLSN</refname>
+ <refpurpose>wait when target <acronym>LSN</> been replayed</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAITLSN <replaceable class="PARAMETER">'LSN'</replaceable> [ , <replaceable class="PARAMETER">delay</replaceable> ]
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+ <title>Description</title>
+
+ <para>
+ The <command>WAITLSN</command> wait till target <acronym>LSN</> will
+ be replayed with an optional <quote>delay</> (milliseconds by default
+ infinity) to be wait for LSN to replayed.
+ </para>
+
+ <para>
+ <command>WAITLSN</command> provides a simple
+ interprocess <acronym>LSN</> wait mechanism for a backends on slave
+ in master-slave replication scheme on <productname>PostgreSQL</productname> database.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Parameters</title>
+
+ <variablelist>
+ <varlistentry>
+ <term><replaceable class="PARAMETER">LSN</replaceable></term>
+ <listitem>
+ <para>
+ Target log sequence number to be wait for.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><replaceable class="PARAMETER">delay</replaceable></term>
+ <listitem>
+ <para>
+ Time in miliseconds to waiting for LSN to be replayed.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
+ </refsect1>
+
+ <refsect1>
+ <title>Notes</title>
+
+ <para>
+ Delay time to waiting for LSN to be replayed must be integer. For
+ default it is infinity. Waiting can be interupped using Ctl+C, or
+ by Postmaster death.
+ </para>
+ </refsect1>
+
+ <refsect1>
+ <title>Examples</title>
+
+ <para>
+ Configure and execute a waitlsn from
+ <application>psql</application>:
+
+<programlisting>
+WAITLSN '0/3F07A6B1', 10000;
+NOTICE: LSN is not reached. Try to make bigger delay.
+WAITLSN
+
+WAITLSN '0/3F07A611';
+WAITLSN
+
+WAITLSN '0/3F0FF791', 500000;
+^CCancel request sent
+NOTICE: LSN is not reached. Try to make bigger delay.
+ERROR: canceling statement due to user request
+</programlisting>
+</para>
+ </refsect1>
+
+ <refsect1>
+ <title>Compatibility</title>
+
+ <para>
+ There is no <command>WAITLSN</command> statement in the SQL
+ standard.
+ </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index 8acdff1..3733ad9 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -200,6 +200,7 @@
&update;
&vacuum;
&values;
+ &waitlsn;
</reference>
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f13f9c1..609c83e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -39,6 +39,7 @@
#include "catalog/pg_control.h"
#include "catalog/pg_database.h"
#include "commands/tablespace.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/bgwriter.h"
@@ -6922,6 +6923,11 @@ StartupXLOG(void)
break;
}
+ /*
+ * After update lastReplayedEndRecPtr set Latches in SHMEM array
+ */
+ WaitLSNSetLatch();
+
/* Else, try to fetch the next WAL record */
record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 6b3742c..091cbe2 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \
policy.o portalcmds.o prepare.o proclang.o \
schemacmds.o seclabel.o sequence.o tablecmds.o tablespace.o trigger.o \
tsearchcmds.o typecmds.o user.o vacuum.o vacuumlazy.o \
- variable.o view.o
+ variable.o view.o waitlsn.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 716f1c3..9ad3275 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -139,7 +139,6 @@
#include "utils/ps_status.h"
#include "utils/timestamp.h"
-
/*
* Maximum size of a NOTIFY payload, including terminating NULL. This
* must be kept small enough so that a notification message fits on one
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 0000000..b441b85
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,195 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ * WaitLSN statment: WAITLSN
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+/*-------------------------------------------------------------------------
+ * Wait for LSN been replayed on slave as of 9.5:
+ * README
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "utils/pg_lsn.h"
+#include "storage/latch.h"
+#include "miscadmin.h"
+#include "storage/spin.h"
+#include "storage/backendid.h"
+#include "access/xact.h"
+#include "storage/shmem.h"
+#include "storage/ipc.h"
+#include "access/xlog_fn.h"
+#include "utils/timestamp.h"
+#include "storage/pmsignal.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "commands/waitlsn.h"
+
+
+/* Latches Own-DisownLatch and AbortCаllBack */
+static uint32 GetSHMEMSize(void);
+static void WLDisownLatchAbort(XactEvent event, void *arg);
+static void WLOwnLatch(void);
+static void WLDisownLatch(void);
+
+void _PG_init(void);
+
+/* Shared memory structures */
+typedef struct
+{
+ int pid;
+ volatile slock_t slock;
+ Latch latch;
+} BIDLatch;
+
+typedef struct
+{
+ int backend_maxid;
+ BIDLatch l_arr[FLEXIBLE_ARRAY_MEMBER];
+} GlobState;
+
+static volatile GlobState *state;
+bool is_latch_owned = false;
+
+static void
+WLOwnLatch(void)
+{
+ SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+ OwnLatch(&state->l_arr[MyBackendId].latch);
+ is_latch_owned = true;
+ if (MyBackendId > state->backend_maxid)
+ state->backend_maxid += 1;
+ state->l_arr[MyBackendId].pid = MyProcPid;
+ SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+static void
+WLDisownLatch(void)
+{
+ SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+ DisownLatch(&state->l_arr[MyBackendId].latch);
+ is_latch_owned = false;
+ if (MyBackendId = state->backend_maxid)
+ state->backend_maxid -= 1;
+ state->l_arr[MyBackendId].pid = 0;
+ SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+/* CallBack function */
+static void
+WLDisownLatchAbort(XactEvent event, void *arg)
+{
+ if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT ||
+ event == XACT_EVENT_ABORT))
+ {
+ WLDisownLatch();
+ }
+}
+
+/* Module load callback */
+void
+_PG_init(void)
+{
+ if (!IsUnderPostmaster)
+ RegisterXactCallback(WLDisownLatchAbort, NULL);
+}
+
+static uint32
+GetSHMEMSize(void)
+{
+ return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1);
+}
+
+void
+WaitLSNShmemInit(void)
+{
+ bool found;
+ uint i;
+
+ state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
+ GetSHMEMSize(),
+ &found);
+ if (!found)
+ {
+ state->backend_maxid = 1;
+ for (i = 0; i < (MaxConnections+1); i++)
+ {
+ state->l_arr[i].pid = 0;
+ SpinLockInit(&state->l_arr[i].slock);
+ InitSharedLatch(&state->l_arr[i].latch);
+ }
+ }
+}
+
+void
+WaitLSNSetLatch(void)
+{
+ uint i;
+ for (i = 1; i < (state->backend_maxid+1); i++)
+ {
+ SpinLockAcquire(&state->l_arr[i].slock);
+ if (state->l_arr[i].pid != 0)
+ SetLatch(&state->l_arr[i].latch);
+ SpinLockRelease(&state->l_arr[i].slock);
+ }
+}
+
+void
+WaitLSNUtility(const char *lsn, const int *delay)
+{
+ XLogRecPtr trg_lsn;
+ XLogRecPtr cur_lsn;
+ int latch_events;
+ int tdelay = delay;
+ TimestampTz timer = GetCurrentTimestamp();
+ trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn)));
+
+
+ if (delay > 0)
+ latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
+ else
+ latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+ WLOwnLatch();
+
+ for (;;)
+ {
+ ResetLatch(&state->l_arr[MyBackendId].latch);
+ cur_lsn = GetXLogReplayRecPtr(NULL);
+
+ /* If LSN had been Replayed */
+ if (trg_lsn <= cur_lsn)
+ break;
+
+ /* If the postmaster dies, finish immediately */
+ if (!PostmasterIsAlive())
+ break;
+
+ /* If Delay time is over */
+ if (latch_events & WL_TIMEOUT)
+ {
+ tdelay -= (GetCurrentTimestamp() - timer);
+ if (tdelay <= 0)
+ break;
+ timer = GetCurrentTimestamp();
+ }
+
+ CHECK_FOR_INTERRUPTS();
+ WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay);
+ }
+
+ WLDisownLatch();
+
+ if (trg_lsn > cur_lsn)
+ elog(NOTICE,"LSN is not reached. Try to make bigger delay.");
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index cb5cfc4..5fb43f6 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -267,7 +267,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
DeallocateStmt PrepareStmt ExecuteStmt
DropOwnedStmt ReassignOwnedStmt
AlterTSConfigurationStmt AlterTSDictionaryStmt
- CreateMatViewStmt RefreshMatViewStmt CreateAmStmt
+ CreateMatViewStmt RefreshMatViewStmt CreateAmStmt WaitLSNStmt
%type <node> select_no_parens select_with_parens select_clause
simple_select values_clause
@@ -306,7 +306,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
%type <list> OptSchemaEltList
%type <boolean> TriggerForSpec TriggerForType
-%type <ival> TriggerActionTime
+%type <ival> TriggerActionTime WaitDelay
%type <list> TriggerEvents TriggerOneEvent
%type <value> TriggerFuncArg
%type <node> TriggerWhen
@@ -644,7 +644,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
VERBOSE VERSION_P VIEW VIEWS VOLATILE
- WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+ WAITLSN WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLPARSE
XMLPI XMLROOT XMLSERIALIZE
@@ -882,6 +882,7 @@ stmt :
| VariableSetStmt
| VariableShowStmt
| ViewStmt
+ | WaitLSNStmt
| /*EMPTY*/
{ $$ = NULL; }
;
@@ -12852,7 +12853,26 @@ frame_bound:
}
;
+/*****************************************************************************
+ *
+ * QUERY:
+ * WAITLSN <LSN> can appear as a query-level command
+ *
+ *
+ *****************************************************************************/
+WaitLSNStmt: WAITLSN Sconst WaitDelay
+ {
+ WaitLSNStmt *n = makeNode(WaitLSNStmt);
+ n->lsn = $2;
+ n->delay = $3;
+ $$ = (Node *)n;
+ }
+ ;
+WaitDelay:
+ ',' Iconst { $$ = $2; }
+ | /*EMPTY*/ { $$ = 0; }
+ ;
/*
* Supporting nonterminals for expressions.
*/
@@ -13908,6 +13928,7 @@ unreserved_keyword:
| VIEW
| VIEWS
| VOLATILE
+ | WAITLSN
| WHITESPACE_P
| WITHIN
| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index c04b17f..66001ae 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/twophase.h"
#include "commands/async.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/autovacuum.h"
@@ -254,6 +255,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
SyncScanShmemInit();
AsyncShmemInit();
+ /*
+ * Init array of Latches in SHMEM for WAITLSN
+ */
+ WaitLSNShmemInit();
+
#ifdef EXEC_BACKEND
/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index ac50c2a..6c2447d 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -54,6 +54,7 @@
#include "commands/user.h"
#include "commands/vacuum.h"
#include "commands/view.h"
+#include "commands/waitlsn.h"
#include "miscadmin.h"
#include "parser/parse_utilcmd.h"
#include "postmaster/bgwriter.h"
@@ -902,6 +903,20 @@ standard_ProcessUtility(Node *parsetree,
break;
}
+ case T_WaitLSNStmt:
+ {
+ WaitLSNStmt *stmt = (WaitLSNStmt *) parsetree;
+ if (!RecoveryInProgress())
+ {
+ ereport(ERROR,(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
+ errmsg("cannot execute %s not during recovery",
+ "WaitLSN")));
+ }
+ else
+ WaitLSNUtility(stmt->lsn, stmt->delay);
+ }
+ break;
+
default:
/* All other statement types have event trigger support */
ProcessUtilitySlow(parsetree, queryString,
@@ -2359,6 +2374,10 @@ CreateCommandTag(Node *parsetree)
tag = "NOTIFY";
break;
+ case T_WaitLSNStmt:
+ tag = "WAITLSN";
+ break;
+
case T_ListenStmt:
tag = "LISTEN";
break;
@@ -2951,6 +2970,10 @@ GetCommandLogLevel(Node *parsetree)
lev = LOGSTMT_ALL;
break;
+ case T_WaitLSNStmt:
+ lev = LOGSTMT_ALL;
+ break;
+
case T_ListenStmt:
lev = LOGSTMT_ALL;
break;
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 0000000..12d224e
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,20 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ * WaitLSN notification: WAITLSN
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAITLSN_H
+#define WAITLSN_H
+
+extern void WaitLSNUtility(const char *lsn, const int *delay);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatch(void);
+
+#endif /* WAITLSN_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 2f7efa8..8b9fc2b 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -463,6 +463,7 @@ typedef enum NodeTag
T_DropReplicationSlotCmd,
T_StartReplicationCmd,
T_TimeLineHistoryCmd,
+ T_WaitLSNStmt,
/*
* TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 1481fff..ee8e0f3 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3101,4 +3101,15 @@ typedef struct AlterTSConfigurationStmt
bool missing_ok; /* for DROP - skip error if missing? */
} AlterTSConfigurationStmt;
+/* ----------------------
+ * WaitLSN Statement
+ * ----------------------
+ */
+typedef struct WaitLSNStmt
+{
+ NodeTag type;
+ char *lsn; /* Taraget LSN to wait for */
+ int *delay; /* Delay to wait for LSN*/
+} WaitLSNStmt;
+
#endif /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 17ffef5..b14193e 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -422,6 +422,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("waitlsn", WAITLSN, UNRESERVED_KEYWORD)
PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers