Hi hackers,

Few days earlier I've finished my work on WAITLSN statement utility, so I’d like to share it.


Introduction
============

Our clients who deal with 9.5 and use asynchronous master-slave replication, asked to make the wait-mechanism on the slave side to prevent the situation when slave handles query which needs data (LSN) that was received, flushed, but still not replayed.


Problem description
===================

The implementation:
Must handle the wait-mechanism using pg_sleep() in order not to load system
Must avoid race conditions if different backend want to wait for different LSN
Must not take snapshot of DB, to avoid troubles with sudden minXID change
Must have optional timeout parameter if LSN traffic has stalled.
Must release on postmaster’s death or interrupts.


Implementation
==============

To avoid troubles with snapshots, WAITLSN was implemented as a utility statement, this allows us to circumvent the snapshot-taking mechanism.
We tried different variants and the most effective way was to use Latches.
To handle interprocess interaction all Latches are stored in shared memory and to cope with race conditions, each Latch is protected by a Spinlock.
Timeout was made optional parameter, it is set in milliseconds.


What works
==========

Actually, it works well even with significant timeout or wait period values, but of course there might be things I've overlooked.

How to use it
==========

WAITLSN ‘LSN’ [, timeout in ms];

#Wait until LSN 0/303EC60 will be replayed, or 10 second passed.
WAITLSN ‘0/303EC60’, 10000;

#Or same without timeout.
WAITLSN ‘0/303EC60’;

Notice: WAITLSN will release on PostmasterDeath or Interruption events if they come earlier then LSN or timeout.

Testing the implementation
======================

The implementation was tested with testgres and unittest python modules.

How to test this implementation:
Start master server
Make table test, insert tuple 1
Make asynchronous slave replication (9.5 wal_level = standby, 9.6 or higher wal_level = replica)
Slave: START TRANSACTION ISOLATION LEVEL REPEATABLE READ ;
                SELECT * FROM test;
Master: delete tuple + make vacuum + get new LSN
Slave: WAITLSN ‘newLSN’, 60000;
                Waitlsn finished with FALSE “LSN doesn`t reached”
Slave: COMMIT;
                WAITLSN ‘newLSN’, 60000;
                Waitlsn finished with success (without NOTICE message)

The WAITLSN as expected wait LSN, and interrupts on PostmasterDeath, interrupts or timeout.

Your feedback is welcome!


---
Ivan Kartyshov
Postgres Professional: http://www.postgrespro.com
The Russian Postgres Company

diff --git a/doc/src/sgml/ref/allfiles.sgml b/doc/src/sgml/ref/allfiles.sgml
index 77667bd..72c5390 100644
--- a/doc/src/sgml/ref/allfiles.sgml
+++ b/doc/src/sgml/ref/allfiles.sgml
@@ -172,6 +172,7 @@ Complete list of usable sgml source files in this directory.
 <!ENTITY update             SYSTEM "update.sgml">
 <!ENTITY vacuum             SYSTEM "vacuum.sgml">
 <!ENTITY values             SYSTEM "values.sgml">
+<!ENTITY waitlsn            SYSTEM "waitlsn.sgml">
 
 <!-- applications and utilities -->
 <!ENTITY clusterdb          SYSTEM "clusterdb.sgml">
diff --git a/doc/src/sgml/ref/waitlsn.sgml b/doc/src/sgml/ref/waitlsn.sgml
new file mode 100644
index 0000000..6a8bdca
--- /dev/null
+++ b/doc/src/sgml/ref/waitlsn.sgml
@@ -0,0 +1,108 @@
+<!--
+doc/src/sgml/ref/waitlsn.sgml
+PostgreSQL documentation
+-->
+
+<refentry id="SQL-WAITLSN">
+ <indexterm zone="sql-waitlsn">
+  <primary>WAITLSN</primary>
+ </indexterm>
+
+ <refmeta>
+  <refentrytitle>WAITLSN</refentrytitle>
+  <manvolnum>7</manvolnum>
+  <refmiscinfo>SQL - Language Statements</refmiscinfo>
+ </refmeta>
+
+ <refnamediv>
+  <refname>WAITLSN</refname>
+  <refpurpose>wait when target <acronym>LSN</> been replayed</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+WAITLSN <replaceable class="PARAMETER">'LSN'</replaceable> [ , <replaceable class="PARAMETER">delay</replaceable> ]
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   The <command>WAITLSN</command> wait till target <acronym>LSN</> will
+   be replayed with an optional <quote>delay</> (milliseconds by default
+   infinity) to be wait for LSN to replayed.
+  </para>
+
+  <para>
+   <command>WAITLSN</command> provides a simple
+   interprocess <acronym>LSN</> wait mechanism for a backends on slave
+   in master-slave replication scheme on <productname>PostgreSQL</productname> database.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Parameters</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><replaceable class="PARAMETER">LSN</replaceable></term>
+    <listitem>
+     <para>
+      Target log sequence number to be wait for.
+     </para>
+    </listitem>
+   </varlistentry>
+   <varlistentry>
+    <term><replaceable class="PARAMETER">delay</replaceable></term>
+    <listitem>
+     <para>
+      Time in miliseconds to waiting for LSN to be replayed.
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Notes</title>
+
+  <para>
+   Delay time to waiting for LSN to be replayed must be integer. For
+   default it is infinity. Waiting can be interupped using Ctl+C, or
+   by Postmaster death.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Examples</title>
+
+  <para>
+   Configure and execute a waitlsn from
+   <application>psql</application>:
+
+<programlisting>
+WAITLSN '0/3F07A6B1', 10000;
+NOTICE:  LSN is not reached. Try to make bigger delay.
+WAITLSN
+
+WAITLSN '0/3F07A611';
+WAITLSN
+
+WAITLSN '0/3F0FF791', 500000;
+^CCancel request sent
+NOTICE:  LSN is not reached. Try to make bigger delay.
+ERROR:  canceling statement due to user request
+</programlisting>
+</para>
+ </refsect1>
+
+ <refsect1>
+  <title>Compatibility</title>
+
+  <para>
+   There is no <command>WAITLSN</command> statement in the SQL
+   standard.
+  </para>
+ </refsect1>
+</refentry>
diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
index 8acdff1..3733ad9 100644
--- a/doc/src/sgml/reference.sgml
+++ b/doc/src/sgml/reference.sgml
@@ -200,6 +200,7 @@
    &update;
    &vacuum;
    &values;
+   &waitlsn;
 
  </reference>
 
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index f13f9c1..609c83e 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -39,6 +39,7 @@
 #include "catalog/pg_control.h"
 #include "catalog/pg_database.h"
 #include "commands/tablespace.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/bgwriter.h"
@@ -6922,6 +6923,11 @@ StartupXLOG(void)
 					break;
 				}
 
+				/*
+				 * After update lastReplayedEndRecPtr set Latches in SHMEM array
+				 */
+				WaitLSNSetLatch();
+
 				/* Else, try to fetch the next WAL record */
 				record = ReadRecord(xlogreader, InvalidXLogRecPtr, LOG, false);
 			} while (record != NULL);
diff --git a/src/backend/commands/Makefile b/src/backend/commands/Makefile
index 6b3742c..091cbe2 100644
--- a/src/backend/commands/Makefile
+++ b/src/backend/commands/Makefile
@@ -20,6 +20,6 @@ OBJS = amcmds.o aggregatecmds.o alter.o analyze.o async.o cluster.o comment.o \
 	policy.o portalcmds.o prepare.o proclang.o \
 	schemacmds.o seclabel.o sequence.o tablecmds.o tablespace.o trigger.o \
 	tsearchcmds.o typecmds.o user.o vacuum.o vacuumlazy.o \
-	variable.o view.o
+	variable.o view.o waitlsn.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 716f1c3..9ad3275 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -139,7 +139,6 @@
 #include "utils/ps_status.h"
 #include "utils/timestamp.h"
 
-
 /*
  * Maximum size of a NOTIFY payload, including terminating NULL.  This
  * must be kept small enough so that a notification message fits on one
diff --git a/src/backend/commands/waitlsn.c b/src/backend/commands/waitlsn.c
new file mode 100644
index 0000000..b441b85
--- /dev/null
+++ b/src/backend/commands/waitlsn.c
@@ -0,0 +1,195 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.c
+ *	  WaitLSN statment: WAITLSN
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	  src/backend/commands/waitlsn.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+/*-------------------------------------------------------------------------
+ * Wait for LSN been replayed on slave as of 9.5:
+ * README
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "utils/pg_lsn.h"
+#include "storage/latch.h"
+#include "miscadmin.h"
+#include "storage/spin.h"
+#include "storage/backendid.h"
+#include "access/xact.h"
+#include "storage/shmem.h"
+#include "storage/ipc.h"
+#include "access/xlog_fn.h"
+#include "utils/timestamp.h"
+#include "storage/pmsignal.h"
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "commands/waitlsn.h"
+
+
+/* Latches Own-DisownLatch and AbortCаllBack */
+static uint32 GetSHMEMSize(void);
+static void WLDisownLatchAbort(XactEvent event, void *arg);
+static void WLOwnLatch(void);
+static void WLDisownLatch(void);
+
+void		_PG_init(void);
+
+/* Shared memory structures */
+typedef struct
+{
+	int					pid;
+	volatile slock_t	slock;
+	Latch				latch;
+} BIDLatch;
+
+typedef struct
+{
+	int			backend_maxid;
+	BIDLatch	l_arr[FLEXIBLE_ARRAY_MEMBER];
+} GlobState;
+
+static volatile GlobState  *state;
+bool						is_latch_owned = false;
+
+static void
+WLOwnLatch(void)
+{
+	SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+	OwnLatch(&state->l_arr[MyBackendId].latch);
+	is_latch_owned = true;
+	if (MyBackendId > state->backend_maxid)
+		state->backend_maxid += 1;
+	state->l_arr[MyBackendId].pid = MyProcPid;
+	SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+static void
+WLDisownLatch(void)
+{
+	SpinLockAcquire(&state->l_arr[MyBackendId].slock);
+	DisownLatch(&state->l_arr[MyBackendId].latch);
+	is_latch_owned = false;
+	if (MyBackendId = state->backend_maxid)
+		state->backend_maxid -= 1;
+	state->l_arr[MyBackendId].pid = 0;
+	SpinLockRelease(&state->l_arr[MyBackendId].slock);
+}
+
+/* CallBack function */
+static void
+WLDisownLatchAbort(XactEvent event, void *arg)
+{
+	if (is_latch_owned && (event == XACT_EVENT_PARALLEL_ABORT ||
+						   event == XACT_EVENT_ABORT))
+	{
+		WLDisownLatch();
+	}
+}
+
+/* Module load callback */
+void
+_PG_init(void)
+{
+	if (!IsUnderPostmaster)
+		RegisterXactCallback(WLDisownLatchAbort, NULL);
+}
+
+static uint32
+GetSHMEMSize(void)
+{
+	return offsetof(GlobState, l_arr) + sizeof(BIDLatch) * (MaxConnections+1);
+}
+
+void
+WaitLSNShmemInit(void)
+{
+	bool	found;
+	uint	i;
+
+	state = (GlobState *) ShmemInitStruct("pg_wait_lsn",
+										  GetSHMEMSize(),
+										  &found);
+	if (!found)
+	{
+		state->backend_maxid = 1;
+		for (i = 0; i < (MaxConnections+1); i++)
+		{
+			state->l_arr[i].pid = 0;
+			SpinLockInit(&state->l_arr[i].slock);
+			InitSharedLatch(&state->l_arr[i].latch);
+		}
+	}
+}
+
+void
+WaitLSNSetLatch(void)
+{
+	uint i;
+	for (i = 1; i < (state->backend_maxid+1); i++)
+	{
+		SpinLockAcquire(&state->l_arr[i].slock);
+		if (state->l_arr[i].pid != 0)
+			SetLatch(&state->l_arr[i].latch);
+		SpinLockRelease(&state->l_arr[i].slock);
+	}
+}
+
+void
+WaitLSNUtility(const char *lsn, const int *delay)
+{
+	XLogRecPtr	trg_lsn;
+	XLogRecPtr	cur_lsn;
+	int			latch_events;
+	int			tdelay = delay;
+	TimestampTz	timer = GetCurrentTimestamp();
+	trg_lsn = DatumGetLSN(DirectFunctionCall1(pg_lsn_in, CStringGetDatum(lsn)));
+
+
+	if (delay > 0)
+		latch_events = WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH;
+	else
+		latch_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
+
+	WLOwnLatch();
+
+	for (;;)
+	{
+		ResetLatch(&state->l_arr[MyBackendId].latch);
+		cur_lsn = GetXLogReplayRecPtr(NULL);
+
+		/* If LSN had been Replayed */
+		if (trg_lsn <= cur_lsn)
+			break;
+
+		/* If the postmaster dies, finish immediately */
+		if (!PostmasterIsAlive())
+			break;
+
+		/* If Delay time is over */
+		if (latch_events & WL_TIMEOUT)
+		{
+			tdelay -= (GetCurrentTimestamp() - timer);
+			if (tdelay <= 0)
+				break;
+			timer = GetCurrentTimestamp();
+		}
+
+		CHECK_FOR_INTERRUPTS();
+		WaitLatch(&state->l_arr[MyBackendId].latch, latch_events, tdelay);
+	}
+
+	WLDisownLatch();
+
+	if (trg_lsn > cur_lsn)
+		elog(NOTICE,"LSN is not reached. Try to make bigger delay.");
+}
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index cb5cfc4..5fb43f6 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -267,7 +267,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 		DeallocateStmt PrepareStmt ExecuteStmt
 		DropOwnedStmt ReassignOwnedStmt
 		AlterTSConfigurationStmt AlterTSDictionaryStmt
-		CreateMatViewStmt RefreshMatViewStmt CreateAmStmt
+		CreateMatViewStmt RefreshMatViewStmt CreateAmStmt WaitLSNStmt
 
 %type <node>	select_no_parens select_with_parens select_clause
 				simple_select values_clause
@@ -306,7 +306,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <list>	OptSchemaEltList
 
 %type <boolean> TriggerForSpec TriggerForType
-%type <ival>	TriggerActionTime
+%type <ival>	TriggerActionTime WaitDelay
 %type <list>	TriggerEvents TriggerOneEvent
 %type <value>	TriggerFuncArg
 %type <node>	TriggerWhen
@@ -644,7 +644,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	VACUUM VALID VALIDATE VALIDATOR VALUE_P VALUES VARCHAR VARIADIC VARYING
 	VERBOSE VERSION_P VIEW VIEWS VOLATILE
 
-	WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
+	WAITLSN WHEN WHERE WHITESPACE_P WINDOW WITH WITHIN WITHOUT WORK WRAPPER WRITE
 
 	XML_P XMLATTRIBUTES XMLCONCAT XMLELEMENT XMLEXISTS XMLFOREST XMLPARSE
 	XMLPI XMLROOT XMLSERIALIZE
@@ -882,6 +882,7 @@ stmt :
 			| VariableSetStmt
 			| VariableShowStmt
 			| ViewStmt
+			| WaitLSNStmt
 			| /*EMPTY*/
 				{ $$ = NULL; }
 		;
@@ -12852,7 +12853,26 @@ frame_bound:
 				}
 		;
 
+/*****************************************************************************
+ *
+ *		QUERY:
+ *				WAITLSN <LSN> can appear as a query-level command
+ *
+ *
+ *****************************************************************************/
 
+WaitLSNStmt: WAITLSN Sconst WaitDelay
+				{
+					WaitLSNStmt *n = makeNode(WaitLSNStmt);
+					n->lsn = $2;
+					n->delay = $3;
+					$$ = (Node *)n;
+				}
+		;
+WaitDelay:
+			',' Iconst							{ $$ = $2; }
+			| /*EMPTY*/							{ $$ = 0; }
+		;
 /*
  * Supporting nonterminals for expressions.
  */
@@ -13908,6 +13928,7 @@ unreserved_keyword:
 			| VIEW
 			| VIEWS
 			| VOLATILE
+			| WAITLSN
 			| WHITESPACE_P
 			| WITHIN
 			| WITHOUT
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index c04b17f..66001ae 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
 #include "access/subtrans.h"
 #include "access/twophase.h"
 #include "commands/async.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
@@ -254,6 +255,11 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	SyncScanShmemInit();
 	AsyncShmemInit();
 
+	/*
+	 * Init array of Latches  in SHMEM for WAITLSN
+	 */
+	WaitLSNShmemInit();
+
 #ifdef EXEC_BACKEND
 
 	/*
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index ac50c2a..6c2447d 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -54,6 +54,7 @@
 #include "commands/user.h"
 #include "commands/vacuum.h"
 #include "commands/view.h"
+#include "commands/waitlsn.h"
 #include "miscadmin.h"
 #include "parser/parse_utilcmd.h"
 #include "postmaster/bgwriter.h"
@@ -902,6 +903,20 @@ standard_ProcessUtility(Node *parsetree,
 				break;
 			}
 
+		case T_WaitLSNStmt:
+			{
+				WaitLSNStmt *stmt = (WaitLSNStmt *) parsetree;
+				if (!RecoveryInProgress())
+				{
+					ereport(ERROR,(errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), 
+							errmsg("cannot execute %s not during recovery",
+							"WaitLSN")));
+				}
+				else
+					WaitLSNUtility(stmt->lsn, stmt->delay);
+			}
+			break;
+
 		default:
 			/* All other statement types have event trigger support */
 			ProcessUtilitySlow(parsetree, queryString,
@@ -2359,6 +2374,10 @@ CreateCommandTag(Node *parsetree)
 			tag = "NOTIFY";
 			break;
 
+		case T_WaitLSNStmt:
+			tag = "WAITLSN";
+			break;
+
 		case T_ListenStmt:
 			tag = "LISTEN";
 			break;
@@ -2951,6 +2970,10 @@ GetCommandLogLevel(Node *parsetree)
 			lev = LOGSTMT_ALL;
 			break;
 
+		case T_WaitLSNStmt:
+			lev = LOGSTMT_ALL;
+			break;
+
 		case T_ListenStmt:
 			lev = LOGSTMT_ALL;
 			break;
diff --git a/src/include/commands/waitlsn.h b/src/include/commands/waitlsn.h
new file mode 100644
index 0000000..12d224e
--- /dev/null
+++ b/src/include/commands/waitlsn.h
@@ -0,0 +1,20 @@
+/*-------------------------------------------------------------------------
+ *
+ * waitlsn.h
+ *	  WaitLSN notification: WAITLSN
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 2016, Regents of PostgresPRO
+ *
+ * src/include/commands/waitlsn.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef WAITLSN_H
+#define WAITLSN_H
+
+extern void WaitLSNUtility(const char *lsn, const int *delay);
+extern void WaitLSNShmemInit(void);
+extern void WaitLSNSetLatch(void);
+
+#endif   /* WAITLSN_H */
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 2f7efa8..8b9fc2b 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -463,6 +463,7 @@ typedef enum NodeTag
 	T_DropReplicationSlotCmd,
 	T_StartReplicationCmd,
 	T_TimeLineHistoryCmd,
+	T_WaitLSNStmt,
 
 	/*
 	 * TAGS FOR RANDOM OTHER STUFF
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 1481fff..ee8e0f3 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -3101,4 +3101,15 @@ typedef struct AlterTSConfigurationStmt
 	bool		missing_ok;		/* for DROP - skip error if missing? */
 } AlterTSConfigurationStmt;
 
+/* ----------------------
+ *		WaitLSN Statement
+ * ----------------------
+ */
+typedef struct WaitLSNStmt
+{
+	NodeTag		type;
+	char	   *lsn;			/* Taraget LSN to wait for */
+	int		   *delay;			/* Delay to wait for LSN*/
+} WaitLSNStmt;
+
 #endif   /* PARSENODES_H */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 17ffef5..b14193e 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -422,6 +422,7 @@ PG_KEYWORD("version", VERSION_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("view", VIEW, UNRESERVED_KEYWORD)
 PG_KEYWORD("views", VIEWS, UNRESERVED_KEYWORD)
 PG_KEYWORD("volatile", VOLATILE, UNRESERVED_KEYWORD)
+PG_KEYWORD("waitlsn", WAITLSN, UNRESERVED_KEYWORD)
 PG_KEYWORD("when", WHEN, RESERVED_KEYWORD)
 PG_KEYWORD("where", WHERE, RESERVED_KEYWORD)
 PG_KEYWORD("whitespace", WHITESPACE_P, UNRESERVED_KEYWORD)

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to