Hi all

There's an oversight in replication origins support in logical
decoding, where the origin node ID isn't passed correctly to callbacks
except for the origin filter callback. All other callbacks see it as
InvalidRepOriginId.

It's a one-line fix, but I've added support in test_decoding to
validate the fix and expanded the test script to cover it.

Should be applied to 9.5 and 9.6.

-- 
 Craig Ringer                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From d59282209327a4def92d10284d1ff9a08819fb9f Mon Sep 17 00:00:00 2001
From: Craig Ringer <cr...@2ndquadrant.com>
Date: Thu, 15 Oct 2015 15:50:04 +0800
Subject: [PATCH] Expose origin_id to logical decoding callacks

Fix an oversight in replication origins where the origin_id isn't stored
while decoding the commit record. Prior to this fix the begin and commit
decoding callbacks were always passed InvalidRepOriginId (0) in
txn->origin_id even for remotely-originated transactions.

The replication origin filter hook
OutputPluginCallbacks->filter_by_origin_cb has always been passed the
origin correctly, so test_decoding didn't notice this issue. It only
tested filtering out remote transactions. So this change adds support
for outputting the origin information on decoded transactions in
test_decoding and add tests to exercise it.
---
 contrib/test_decoding/expected/replorigin.out | 43 +++++++++++++++++++++++++++
 contrib/test_decoding/sql/replorigin.sql      | 18 +++++++++++
 contrib/test_decoding/test_decoding.c         | 37 +++++++++++++++++++++--
 src/backend/replication/logical/decode.c      |  1 +
 4 files changed, 96 insertions(+), 3 deletions(-)

diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c0f5125..eba8da2 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -127,6 +127,49 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  COMMIT
 (3 rows)
 
+-- Make sure remote-originated tx's are not filtered out when only-local is unset
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ pg_replication_origin_session_setup 
+-------------------------------------
+ 
+(1 row)
+
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+COMMIT;
+-- and empty tx's
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+ pg_replication_origin_xact_setup 
+----------------------------------
+ 
+(1 row)
+
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0',  'only-local', '0', 'include-origins', '1');
+                                                      data                                                      
+----------------------------------------------------------------------------------------------------------------
+ BEGIN (from "test_decoding: regression_slot" at lsn 0/AABBCCFF)
+ table public.origin_tbl: INSERT: id[integer]:4 data[text]:'will be replicated even though remotely originated'
+ COMMIT
+ BEGIN (from "test_decoding: regression_slot" at lsn 1/AABBCCFF)
+ COMMIT
+(5 rows)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset 
+-------------------------------------
+ 
+(1 row)
+
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 --------------------------
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e12404e..9115d34 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -60,5 +60,23 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
 INSERT INTO origin_tbl(data) VALUES ('will be replicated');
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1',  'only-local', '1');
 
+-- Make sure remote-originated tx's are not filtered out when only-local is unset
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+BEGIN;
+SELECT pg_replication_origin_xact_setup('0/aabbccff', '2013-01-01 00:10');
+INSERT INTO origin_tbl(data) VALUES ('will be replicated even though remotely originated');
+COMMIT;
+
+-- and empty tx's
+BEGIN;
+SELECT pg_replication_origin_xact_setup('1/aabbccff', '2013-01-01 00:20');
+CREATE TEMPORARY TABLE test_empty_tx(blah integer);
+COMMIT;
+
+-- Decode with options not otherwise tested - skip empty xacts on, only-local off. Can't include xids since they'll change each run.
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '0',  'only-local', '0', 'include-origins', '1');
+
+SELECT pg_replication_origin_session_reset();
+
 SELECT pg_drop_replication_slot('regression_slot');
 SELECT pg_replication_origin_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 80fc5f4..e041154 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -45,6 +45,7 @@ typedef struct
 	bool		skip_empty_xacts;
 	bool		xact_wrote_changes;
 	bool		only_local;
+	bool		include_origins;
 } TestDecodingData;
 
 static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -103,6 +104,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 	data->include_timestamp = false;
 	data->skip_empty_xacts = false;
 	data->only_local = false;
+	data->include_origins = false;
 
 	ctx->output_plugin_private = data;
 
@@ -172,6 +174,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
 						 strVal(elem->arg), elem->defname)));
 		}
+		else if (strcmp(elem->defname, "include-origins") == 0)
+		{
+
+			if (elem->arg == NULL)
+				data->include_origins = true;
+			else if (!parse_bool(strVal(elem->arg), &data->include_origins))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				  errmsg("could not parse value \"%s\" for parameter \"%s\"",
+						 strVal(elem->arg), elem->defname)));
+		}
 		else
 		{
 			ereport(ERROR,
@@ -203,17 +216,33 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
 	if (data->skip_empty_xacts)
 		return;
 
+	Assert(txn->origin_id != DoNotReplicateId);
+
 	pg_output_begin(ctx, data, txn, true);
 }
 
 static void
 pg_output_begin(LogicalDecodingContext *ctx, TestDecodingData *data, ReorderBufferTXN *txn, bool last_write)
 {
+	char *origin = NULL;
+
 	OutputPluginPrepareWrite(ctx, last_write);
+
+	appendStringInfoString(ctx->out, "BEGIN");
+
 	if (data->include_xids)
-		appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
-	else
-		appendStringInfoString(ctx->out, "BEGIN");
+		appendStringInfo(ctx->out, " %u", txn->xid);
+
+	if (data->include_origins && txn->origin_id != InvalidRepOriginId)
+	{
+		if (!replorigin_by_oid(txn->origin_id, true, &origin))
+			origin = "unknown node";
+		appendStringInfo(ctx->out, " (from \"%s\" at lsn %X/%X)",
+				origin,
+				(uint32)(txn->origin_lsn >> 32),
+				(uint32)(txn->origin_lsn));
+	}
+
 	OutputPluginWrite(ctx, last_write);
 }
 
@@ -237,6 +266,8 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 		appendStringInfo(ctx->out, " (at %s)",
 						 timestamptz_to_str(txn->commit_time));
 
+	Assert(txn->origin_id != DoNotReplicateId);
+
 	OutputPluginWrite(ctx, true);
 }
 
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c629da3..9044226 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -455,6 +455,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
 
 	if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN)
 	{
+		origin_id = XLogRecGetOrigin(buf->record);
 		origin_lsn = parsed->origin_lsn;
 		commit_time = parsed->origin_timestamp;
 	}
-- 
2.1.0

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to