On 15 October 2015 at 20:11, Craig Ringer <cr...@2ndquadrant.com> wrote:
> On 15 October 2015 at 19:04, Andres Freund <and...@anarazel.de> wrote:
>
>> As far as I can see all the other places have it assigned.
>
> Ok, thanks. Not much need for a followup patch then, if you're not
> using the test changes part.

Here's what I used for my tests, anyway, including an updated fix.

You'll note that the tests fail. When the replication origin is reset
and set again with pg_replication_origin_xact_setup mid-xact, the
origin identity exposed to the decoding plugin callbacks for all
records (including those created before the origin change) is the
latter origin, the one active at COMMIT time.

Is that the intended behaviour? That the session identifier is
determined by what was active at commit time, and only the lsn and
timestamp vary throughout the xact? It looks like it from the code.

Should pg_replication_origin_xact_reset() and
pg_replication_origin_xact_setup() be permitted within a transaction?
Or is this just a "well, don't do that"?

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From 5efc1d3f9017ed8e178aa5a3986275bcbd504f30 Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Thu, 15 Oct 2015 20:45:06 +0800
Subject: [PATCH] Decode replication origin in commit

Fix an oversight in replication origins where the origin_id isn't stored
while decoding the commit record. Prior to this fix the begin and commit
decoding callbacks were always passed InvalidRepOriginId (0) in
txn->origin_id even for remotely-originated transactions.

The replication origin filter hook
OutputPluginCallbacks->filter_by_origin_cb has always been passed the
origin correctly, so test_decoding didn't notice this issue. It only
tested filtering out remote transactions. So this change adds support
for outputting the origin information on decoded transactions in
test_decoding and add tests to exercise it.

Also add tests for changing origin within a transaction.
---
 contrib/test_decoding/expected/replorigin.out | 88 ++++++++++++++++++++++++++-
 contrib/test_decoding/sql/replorigin.sql      | 35 +++++++++++
 contrib/test_decoding/test_decoding.c         | 50 ++++++++++++++-
 src/backend/replication/logical/decode.c      |  2 +-
 4 files changed, 170 insertions(+), 5 deletions(-)

diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c0f5125..11f9a2b 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -12,11 +12,18 @@ SELECT pg_replication_origin_create('test_decoding: regression_slot');
 SELECT pg_replication_origin_create('test_decoding: regression_slot');
 ERROR:  duplicate key value violates unique constraint "pg_replication_origin_roname_index"
 DETAIL:  Key (roname)=(test_decoding: regression_slot) already exists.
+-- Create a second origin too
+SELECT pg_replication_origin_create('test_decoding: regression_slot 2');
+ pg_replication_origin_create 
+------------------------------
+                            2
+(1 row)
+
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('test_decoding: temp');
  pg_replication_origin_create 
 ------------------------------
-                            2
+                            3
 (1 row)
 
 SELECT pg_replication_origin_drop('test_decoding: temp');
@@ -127,6 +134,85 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  COMMIT
 (3 rows)
 
+-- This time we'll prepare a series of transactions to decode in one go
+-- rather than decoding one by one.
+--
+-- First, to make sure remote-originated tx's are not filtered out when only-local is unset,
+-- we need another tx with an origin. This time we'll set a different origin for each
+-- change.
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ pg_replication_origin_session_setup 
+-------------------------------------
+ 
+(1 row)
+
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+SELECT pg_replication_origin_xact_setup('0/aabbcd00', '2013-01-01 00:11');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('will also be replicated even though remotely originated');
+-- Change the origin replication identifier mid-transaction
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+ 
+(1 row)
+
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot 2');
+ pg_replication_origin_session_setup 
+-------------------------------------
+ 
+(1 row)
+
+SELECT pg_replication_origin_xact_setup('0/aabbcd01', '2013-01-01 00:13');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('from second origin');
+COMMIT;
+-- then run an empty tx, since this test will be setting skip-empty-xacts=0
+-- Note that we need to do something, just something that won't get decoded,
+-- to force a commit to be recorded.
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0',  'only-local', '0', 'include-origins', '1');
+                                                                                    data                                                                                     
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN -- origin:'test_decoding: regression_slot 2'@0/AABBCD01
+ table public.origin_tbl: INSERT: id[integer]:4 data[text]:'will be replicated even though remotely originated' -- origin:'test_decoding: regression_slot'@0/AABBCD01
+ table public.origin_tbl: INSERT: id[integer]:5 data[text]:'will also be replicated even though remotely originated' -- origin:'test_decoding: regression_slot'@0/AABBCD01
+ table public.origin_tbl: INSERT: id[integer]:6 data[text]:'from second origin' -- origin:'test_decoding: regression_slot 2'@0/AABBCD01
+ COMMIT -- origin:'test_decoding: regression_slot 2'@0/AABBCD01
+ BEGIN -- origin:'test_decoding: regression_slot 2'@1/AABBCCFF
+ COMMIT -- origin:'test_decoding: regression_slot 2'@1/AABBCCFF
+(7 rows)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+ 
+(1 row)
+
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e12404e..252c786 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -8,6 +8,9 @@ SELECT pg_replication_origin_create('test_decoding: regression_slot');
 -- ensure duplicate creations fail
 SELECT pg_replication_origin_create('test_decoding: regression_slot');
 
+-- Create a second origin too
+SELECT pg_replication_origin_create('test_decoding: regression_slot 2');
+
 --ensure deletions work (once)
 SELECT pg_replication_origin_create('test_decoding: temp');
 SELECT pg_replication_origin_drop('test_decoding: temp');
@@ -60,5 +63,37 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 INSERT INTO origin_tbl(data) VALUES ('will be replicated');
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
 
+-- This time we'll prepare a series of transactions to decode in one go
+-- rather than decoding one by one.
+--
+-- First, to make sure remote-originated tx's are not filtered out when only-local is unset,
+-- we need another tx with an origin. This time we'll set a different origin for each
+-- change.
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+SELECT pg_replication_origin_xact_setup('0/aabbcd00', '2013-01-01 00:11');
+INSERT INTO origin_tbl(data) VALUES ('will also be replicated even though remotely originated');
+-- Change the origin replication identifier mid-transaction
+SELECT pg_replication_origin_session_reset();
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot 2');
+SELECT pg_replication_origin_xact_setup('0/aabbcd01', '2013-01-01 00:13');
+INSERT INTO origin_tbl(data) VALUES ('from second origin');
+COMMIT;
+
+-- then run an empty tx, since this test will be setting skip-empty-xacts=0
+-- Note that we need to do something, just something that won't get decoded,
+-- to force a commit to be recorded.
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0',  'only-local', '0', 'include-origins', '1');
+
+SELECT pg_replication_origin_session_reset();
+
 SELECT pg_drop_replication_slot('regression_slot');
 SELECT pg_replication_origin_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 80fc5f4..4cf35fd 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -45,6 +45,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		include_origins;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -64,6 +65,9 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id);
 
+static void appendOrigin(LogicalDecodingContext *ctx,
+				TestDecodingData *data, ReorderBufferTXN *txn);
+
 void
 _PG_init(void)
 {
@@ -103,6 +107,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->include_origins = false;
 
 	ctx->output_plugin_private = data;
 
@@ -172,6 +177,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
 						 strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "include-origins") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->include_origins = true;
+			else if (!parse_bool(strVal(elem->arg), &data->include_origins))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -203,6 +219,8 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	if (data->skip_empty_xacts)
 		return;
 
+	Assert(txn->origin_id != DoNotReplicateId);
+
 	pg_output_begin(ctx, data, txn, true);
 }
 
@@ -210,10 +228,14 @@ static void
 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 {
 	OutputPluginPrepareWrite(ctx, last_write);
+
+	appendStringInfoString(ctx->out, "BEGIN");
+
 	if (data->include_xids)
-		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
-	else
-		appendStringInfoString(ctx->out, "BEGIN");
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	appendOrigin(ctx, data, txn);
+
 	OutputPluginWrite(ctx, last_write);
 }
 
@@ -237,6 +259,10 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		appendStringInfo(ctx->out, " (at %s)",
 						 timestamptz_to_str(txn->commit_time));
 
+	appendOrigin(ctx, data, txn);
+
+	Assert(txn->origin_id != DoNotReplicateId);
+
 	OutputPluginWrite(ctx, true);
 }
 
@@ -469,5 +495,23 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 	MemoryContextSwitchTo(old);
 	MemoryContextReset(data->context);
 
+	appendOrigin(ctx, data, txn);
+
 	OutputPluginWrite(ctx, true);
 }
+
+static void
+appendOrigin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn)
+{
+	char *origin;
+
+	if (data->include_origins && txn->origin_id != InvalidRepOriginId)
+	{
+		if (!replorigin_by_oid(txn->origin_id, true, &origin))
+			origin = "[unknown]";
+		appendStringInfo(ctx->out, " -- origin:'%s'@%X/%X",
+				origin,
+				(uint32)(txn->origin_lsn >> 32),
+				(uint32)(txn->origin_lsn));
+	}
+}
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c629da3..9f60687 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -450,7 +450,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 {
 	XLogRecPtr	origin_lsn = InvalidXLogRecPtr;
 	XLogRecPtr	commit_time = InvalidXLogRecPtr;
-	XLogRecPtr	origin_id = InvalidRepOriginId;
+	XLogRecPtr	origin_id = XLogRecGetOrigin(buf->record);
 	int			i;
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to