On 07/04/16 12:26, Andres Freund wrote:
Hi,

On 2016-04-06 20:03:20 +0200, Petr Jelinek wrote:
Attached patch adds filtering of both database and origin. Added tests with
slightly less hardcoding than what you proposed above.

Not a fan of creating & dropping another database - that's really rather
expensive. And we're in a target that doesn't support installcheck, so
relying on template1's existance seems fine...


Makes sense, changed that bit.


diff --git a/contrib/test_decoding/expected/messages.out 
b/contrib/test_decoding/expected/messages.out
index 70130e9..a5b13c5 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -1,6 +1,5 @@
  -- predictability
  SET synchronous_commit = on;
-SET client_encoding = 'utf8';

I guess that's just from the previous test with czech text?


Yeah it's cleanup after the d25379eb23383f1d2f969e65e0332b47c19aea94 .

--
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services
diff --git a/contrib/test_decoding/expected/messages.out b/contrib/test_decoding/expected/messages.out
index 70130e9..c75d401 100644
--- a/contrib/test_decoding/expected/messages.out
+++ b/contrib/test_decoding/expected/messages.out
@@ -1,6 +1,5 @@
 -- predictability
 SET synchronous_commit = on;
-SET client_encoding = 'utf8';
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
  ?column? 
 ----------
@@ -71,9 +70,30 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'for
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
 (7 rows)
 
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+\set prevdb :DBNAME
+\c template1
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
  ?column? 
 ----------
- init
+ otherdb1
+(1 row)
+
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+ ?column? 
+----------
+ otherdb2
+(1 row)
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
+ ?column? 
+----------
+ cleanup
 (1 row)
 
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
index c0f5125..8e8889d 100644
--- a/contrib/test_decoding/expected/replorigin.out
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -59,6 +59,12 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 -- ensure we prevent duplicate setup
 SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 ERROR:  cannot setup replication origin when one is already setup
+SELECT 'this message will not be decoded' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+             ?column?             
+----------------------------------
+ this message will not be decoded
+(1 row)
+
 BEGIN;
 -- setup transaction origin
 SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
diff --git a/contrib/test_decoding/sql/messages.sql b/contrib/test_decoding/sql/messages.sql
index 4aedb04..cf3f773 100644
--- a/contrib/test_decoding/sql/messages.sql
+++ b/contrib/test_decoding/sql/messages.sql
@@ -1,6 +1,5 @@
 -- predictability
 SET synchronous_commit = on;
-SET client_encoding = 'utf8';
 
 SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
 
@@ -22,4 +21,14 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
 
-SELECT 'init' FROM pg_drop_replication_slot('regression_slot');
+-- test db filtering
+\set prevdb :DBNAME
+\c template1
+
+SELECT 'otherdb1' FROM pg_logical_emit_message(false, 'test', 'otherdb1');
+SELECT 'otherdb2' FROM pg_logical_emit_message(true, 'test', 'otherdb2');
+
+\c :prevdb
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+
+SELECT 'cleanup' FROM pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
index e12404e..a33e460bb 100644
--- a/contrib/test_decoding/sql/replorigin.sql
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -31,6 +31,8 @@ SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 -- ensure we prevent duplicate setup
 SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
 
+SELECT 'this message will not be decoded' FROM pg_logical_emit_message(false, 'test', 'this message will not be decoded');
+
 BEGIN;
 -- setup transaction origin
 SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 3e80c4a..0cdb0b8 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+	if (ctx->callbacks.filter_by_origin_cb == NULL)
+		return false;
+
+	return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
 /*
  * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
  */
@@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	XLogReaderState *r = buf->record;
 	TransactionId	xid = XLogRecGetXid(r);
 	uint8			info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+	RepOriginId		origin_id = XLogRecGetOrigin(r);
 	Snapshot		snapshot;
 	xl_logical_message *message;
 
@@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
 	message = (xl_logical_message *) XLogRecGetData(r);
 
+	if (message->dbId != ctx->slot->data.database ||
+		FilterByOrigin(ctx, origin_id))
+		return;
+
 	if (message->transactional &&
 		!SnapBuildProcessChange(builder, xid, buf->origptr))
 		return;
@@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 							  message->message + message->prefix_size);
 }
 
-static inline bool
-FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
-{
-	if (ctx->callbacks.filter_by_origin_cb == NULL)
-		return false;
-
-	return filter_by_origin_cb_wrapper(ctx, origin_id);
-}
-
 /*
  * Consolidated commit record handling between the different form of commit
  * records.
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
index 684f799..efcc25a 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/message.c
@@ -31,6 +31,8 @@
 
 #include "postgres.h"
 
+#include "miscadmin.h"
+
 #include "access/xact.h"
 
 #include "catalog/indexing.h"
@@ -60,6 +62,7 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
 		GetCurrentTransactionId();
 	}
 
+	xlrec.dbId = MyDatabaseId;
 	xlrec.transactional = transactional;
 	xlrec.prefix_size = strlen(prefix) + 1;
 	xlrec.message_size = size;
@@ -69,6 +72,9 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
 	XLogRegisterData((char *) prefix, xlrec.prefix_size);
 	XLogRegisterData((char *) message, size);
 
+	/* allow origin filtering */
+	XLogIncludeOrigin();
+
 	return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
 }
 
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
index 8b968d5..c9fd477 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/message.h
@@ -19,6 +19,7 @@
  */
 typedef struct xl_logical_message
 {
+	Oid         dbId;							/* Oid of the database */
 	bool		transactional;					/* is message transactional? */
 	Size		prefix_size;					/* length of prefix */
 	Size		message_size;					/* size of the message */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to