On 14 December 2015 at 11:28, Craig Ringer <cr...@2ndquadrant.com> wrote:

> Hi all
>
> Attached is a patch against 9.6 to add support for informing logical
> decoding plugins of the new sequence last_value when sequence advance WAL
> records are processed during decoding.
>


Attached a slightly updated version. It just has less spam in the
regression tests, by adding a new option to test_decoding to show
sequences, which it doesn't enable except in sequence specific tests.


-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 8ee88b0850fea17086a293c8afee83af3a6b95e1 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Mon, 14 Dec 2015 15:07:30 +0800
Subject: [PATCH] Logical decoding for sequence advances

Add a new logical decoding callback seq_advance_cb that's invoked
whenever we decode a new sequence chunk allocation from WAL.

Also add support for it in test_decoding.

It's guaranteed that the last_value passed to the callback is equal to
or greater than (for +ve sequences) any value returned by a nextval()
call visible to any transaction committed at or before the callback is
invoked.

In practice it'll be quite a lot larger because PostgreSQL
preallocates large chunks of sequences to minimise WAL writes, at
the cost of wasting values if it has to do crash recovery. Logical
decoding sees the crash recovery position when it gets advanced.

Needed to make logical replication based failover possible.
---
 contrib/test_decoding/Makefile              |   2 +-
 contrib/test_decoding/expected/sequence.out | 108 ++++++++++++++++++++++++++++
 contrib/test_decoding/sql/sequence.sql      |  35 +++++++++
 contrib/test_decoding/test_decoding.c       |  33 +++++++++
 src/backend/replication/logical/decode.c    |  60 +++++++++++++++-
 src/backend/replication/logical/logical.c   |  28 +++++++-
 src/include/replication/logical.h           |   3 +
 src/include/replication/output_plugin.h     |  16 +++++
 8 files changed, 282 insertions(+), 3 deletions(-)
 create mode 100644 contrib/test_decoding/expected/sequence.out
 create mode 100644 contrib/test_decoding/sql/sequence.sql

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a362e69..91e2765 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
-	binary prepared replorigin
+	binary prepared replorigin sequence
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/sequence.out b/contrib/test_decoding/expected/sequence.out
new file mode 100644
index 0000000..a13d29a
--- /dev/null
+++ b/contrib/test_decoding/expected/sequence.out
@@ -0,0 +1,108 @@
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE SEQUENCE test_seq;
+SELECT sum(nextval('test_seq')) FROM generate_series(1, 100);
+ sum  
+------
+ 5050
+(1 row)
+
+ALTER SEQUENCE test_seq RESTART WITH 2;
+SELECT sum(nextval('test_seq')) FROM generate_series(1, 100);
+ sum  
+------
+ 5150
+(1 row)
+
+ALTER SEQUENCE test_seq INCREMENT BY 5;
+SELECT sum(nextval('test_seq')) FROM generate_series(1, 100);
+  sum  
+-------
+ 35350
+(1 row)
+
+ALTER SEQUENCE test_seq CACHE 10000;
+SELECT sum(nextval('test_seq')) FROM generate_series(1, 15000);
+    sum    
+-----------
+ 571552500
+(1 row)
+
+DROP SEQUENCE test_seq;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+                 data                 
+--------------------------------------
+ sequence test_seq advanced to 1
+ sequence test_seq advanced to 33
+ sequence test_seq advanced to 66
+ sequence test_seq advanced to 99
+ sequence test_seq advanced to 132
+ sequence test_seq advanced to 2
+ sequence test_seq advanced to 34
+ sequence test_seq advanced to 67
+ sequence test_seq advanced to 100
+ sequence test_seq advanced to 133
+ sequence test_seq advanced to 101
+ sequence test_seq advanced to 266
+ sequence test_seq advanced to 431
+ sequence test_seq advanced to 596
+ sequence test_seq advanced to 761
+ sequence test_seq advanced to 601
+ sequence test_seq advanced to 50761
+ sequence test_seq advanced to 100761
+(18 rows)
+
+CREATE SEQUENCE test2_seq MINVALUE 100 MAXVALUE 200 START 200 INCREMENT BY -1 CYCLE;
+SELECT sum(nextval('test2_seq')) FROM generate_series(1, 300);
+  sum  
+-------
+ 45147
+(1 row)
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+                 data                 
+--------------------------------------
+ sequence test_seq advanced to 1
+ sequence test_seq advanced to 33
+ sequence test_seq advanced to 66
+ sequence test_seq advanced to 99
+ sequence test_seq advanced to 132
+ sequence test_seq advanced to 2
+ sequence test_seq advanced to 34
+ sequence test_seq advanced to 67
+ sequence test_seq advanced to 100
+ sequence test_seq advanced to 133
+ sequence test_seq advanced to 101
+ sequence test_seq advanced to 266
+ sequence test_seq advanced to 431
+ sequence test_seq advanced to 596
+ sequence test_seq advanced to 761
+ sequence test_seq advanced to 601
+ sequence test_seq advanced to 50761
+ sequence test_seq advanced to 100761
+ sequence test2_seq advanced to 200
+ sequence test2_seq advanced to 168
+ sequence test2_seq advanced to 135
+ sequence test2_seq advanced to 102
+ sequence test2_seq advanced to 100
+ sequence test2_seq advanced to 168
+ sequence test2_seq advanced to 135
+ sequence test2_seq advanced to 102
+ sequence test2_seq advanced to 100
+ sequence test2_seq advanced to 168
+ sequence test2_seq advanced to 135
+ sequence test2_seq advanced to 102
+(30 rows)
+
+DROP SEQUENCE test2_seq;
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+ 
+(1 row)
+
diff --git a/contrib/test_decoding/sql/sequence.sql b/contrib/test_decoding/sql/sequence.sql
new file mode 100644
index 0000000..cf9dba3
--- /dev/null
+++ b/contrib/test_decoding/sql/sequence.sql
@@ -0,0 +1,35 @@
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE SEQUENCE test_seq;
+
+SELECT sum(nextval('test_seq')) FROM generate_series(1, 100);
+
+ALTER SEQUENCE test_seq RESTART WITH 2;
+
+SELECT sum(nextval('test_seq')) FROM generate_series(1, 100);
+
+ALTER SEQUENCE test_seq INCREMENT BY 5;
+
+SELECT sum(nextval('test_seq')) FROM generate_series(1, 100);
+
+ALTER SEQUENCE test_seq CACHE 10000;
+
+SELECT sum(nextval('test_seq')) FROM generate_series(1, 15000);
+
+DROP SEQUENCE test_seq;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+
+CREATE SEQUENCE test2_seq MINVALUE 100 MAXVALUE 200 START 200 INCREMENT BY -1 CYCLE;
+
+SELECT sum(nextval('test2_seq')) FROM generate_series(1, 300);
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-sequences', '1');
+
+DROP SEQUENCE test2_seq;
+
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 80fc5f4..1197524 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -45,6 +45,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		include_sequences;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -64,6 +65,9 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id);
 
+static void pg_decode_seq_advance(LogicalDecodingContext *ctx,
+		const char * seq_name, uint64 last_value);
+
 void
 _PG_init(void)
 {
@@ -82,6 +86,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_cb = pg_decode_commit_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
+	cb->seq_advance_cb = pg_decode_seq_advance;
 }
 
 
@@ -172,6 +177,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
 						 strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "include-sequences") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->include_sequences = true;
+			else if (!parse_bool(strVal(elem->arg), &data->include_sequences))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -251,6 +267,23 @@ pg_decode_filter(LogicalDecodingContext *ctx,
 	return false;
 }
 
+static void
+pg_decode_seq_advance(LogicalDecodingContext *ctx, const char * seq_name,
+		uint64 last_value)
+{
+	TestDecodingData *data = ctx->output_plugin_private;
+
+	if (!data->include_sequences)
+		return;
+
+	OutputPluginPrepareWrite(ctx, true);
+
+	appendStringInfo(ctx->out, "sequence %s advanced to "UINT64_FORMAT,
+			seq_name, last_value);
+
+	OutputPluginWrite(ctx, true);
+}
+
 /*
  * Print literal `outputstr' already represented as string of type `typid'
  * into stringbuf `s'.
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 9f60687..c32f509 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -37,6 +37,8 @@
 
 #include "catalog/pg_control.h"
 
+#include "commands/sequence.h"
+
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/reorderbuffer.h"
@@ -58,6 +60,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeSeqOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -71,6 +74,9 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 			xl_xact_parsed_abort *parsed, TransactionId xid);
 
+static void DecodeSeqAdvance(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+			Form_pg_sequence seq);
+
 /* common function to decode tuples */
 static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
 
@@ -107,6 +113,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			DecodeStandbyOp(ctx, &buf);
 			break;
 
+		case RM_SEQ_ID:
+			DecodeSeqOp(ctx, &buf);
+			break;
+
 		case RM_HEAP2_ID:
 			DecodeHeap2Op(ctx, &buf);
 			break;
@@ -130,7 +140,6 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 		case RM_HASH_ID:
 		case RM_GIN_ID:
 		case RM_GIST_ID:
-		case RM_SEQ_ID:
 		case RM_SPGIST_ID:
 		case RM_BRIN_ID:
 		case RM_COMMIT_TS_ID:
@@ -303,6 +312,30 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+static void
+DecodeSeqOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	uint8       info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+	xl_seq_rec *xlrec = (xl_seq_rec *) XLogRecGetData(buf->record);
+	HeapTuple   item;
+	Form_pg_sequence seq;
+
+	/* no point in doing anything yet */
+	if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+		return;
+
+	if (info != XLOG_SEQ_LOG)
+		elog(PANIC, "DecodeSeqOp: unknown sequence xlog op code %u", info);
+
+	/* nextval_internal etc write a xl_seq_rec immediately followed by a HeapTupleData */
+	item = (HeapTuple)((char*)xlrec + sizeof(xl_seq_rec));
+
+	seq = (Form_pg_sequence)((char*)item + HEAPTUPLESIZE);
+
+	DecodeSeqAdvance(ctx, buf, seq);
+}
+
 /*
  * Handle rmgr HEAP2_ID records for DecodeRecordIntoReorderBuffer().
  */
@@ -838,6 +871,31 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
 }
 
+static void
+DecodeSeqAdvance(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
+		Form_pg_sequence seq)
+{
+	XLogReaderState *r = buf->record;
+	RelFileNode target_node;
+
+	/* If the output plugin isn't interested, don't waste any time */
+	if (ctx->callbacks.seq_advance_cb == NULL)
+		return;
+
+	/* only interested in our database */
+	XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+	if (target_node.dbNode != ctx->slot->data.database)
+		return;
+
+	/*
+	 * Because sequences are updated outside transaction boundaries there
+	 * is no need to use a reorder buffer here. We invoke the logical
+	 * decoding plugin callback immediately.
+	 */
+	seq_advance_cb_wrapper(ctx, buf->origptr, NameStr(seq->sequence_name),
+			seq->last_value);
+}
+
 
 /*
  * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 1ce9081..30527df 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -683,7 +683,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 
 	/* Push callback + info on the error context stack */
 	state.ctx = ctx;
-	state.callback_name = "shutdown";
+	state.callback_name = "filter_by_origin";
 	state.report_location = InvalidXLogRecPtr;
 	errcallback.callback = output_plugin_error_callback;
 	errcallback.arg = (void *) &state;
@@ -702,6 +702,32 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+void
+seq_advance_cb_wrapper(LogicalDecodingContext *ctx, XLogRecPtr change_lsn,
+		const char * seq_name, uint64 last_value)
+{
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "advance_seq";
+	state.report_location = change_lsn;
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	ctx->accept_writes = true;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.seq_advance_cb(ctx, seq_name, last_value);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dfdbe65..ba9f936 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -99,4 +99,7 @@ extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
 
+extern void seq_advance_cb_wrapper(LogicalDecodingContext *ctx,
+		XLogRecPtr change_lsn, const char * seq_name, uint64 last_value);
+
 #endif
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 17c3de2..f6d6a10 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -88,6 +88,21 @@ typedef void (*LogicalDecodeShutdownCB) (
 );
 
 /*
+ * A sequence has been advanced, with a new chunk allocated and written
+ * to WAL.
+ *
+ * This doesn't happen every time nextval() is called, there's caching.  It's
+ * guaranteed that the new sequence position will be at or ahead of the most
+ * recent value any committed xact has obtained when this callback is invoked.
+ */
+typedef void (*LogicalDecodeSeqAdvanceCB) (
+											struct LogicalDecodingContext *,
+											const char * seq_name,
+											uint64 last_value
+		);
+
+
+/*
  * Output plugin callbacks
  */
 typedef struct OutputPluginCallbacks
@@ -98,6 +113,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeCommitCB commit_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
+	LogicalDecodeSeqAdvanceCB seq_advance_cb;
 } OutputPluginCallbacks;
 
 void		OutputPluginPrepareWrite(struct LogicalDecodingContext *ctx, bool last_write);
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to