Hi,

here is updated version of this patch, calling the messages logical (decoding) messages consistently everywhere and removing any connection to standby messages. Moving this to it's own module gave me place to write some brief explanation about this so the code documentation has hopefully improved as well.

The functionality itself didn't change.

--
 Petr Jelinek                  http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services
From c19d7274091baee4a523333b1fa0ac684ace4cc9 Mon Sep 17 00:00:00 2001
From: Petr Jelinek <pjmodos@pjmodos.net>
Date: Fri, 22 Jan 2016 20:15:22 +0100
Subject: [PATCH] Logical Messages

---
 contrib/test_decoding/Makefile                  |   2 +-
 contrib/test_decoding/test_decoding.c           |  22 +++-
 doc/src/sgml/func.sgml                          |  42 ++++++++
 doc/src/sgml/logicaldecoding.sgml               |  22 ++++
 src/backend/access/rmgrdesc/Makefile            |   4 +-
 src/backend/access/rmgrdesc/logicalmsgdesc.c    |  33 ++++++
 src/backend/access/transam/rmgr.c               |   1 +
 src/backend/replication/logical/Makefile        |   2 +-
 src/backend/replication/logical/decode.c        |  37 +++++++
 src/backend/replication/logical/logical.c       |  38 +++++++
 src/backend/replication/logical/logicalfuncs.c  |  27 +++++
 src/backend/replication/logical/message.c       | 132 ++++++++++++++++++++++++
 src/backend/replication/logical/reorderbuffer.c |  73 +++++++++++++
 src/bin/pg_xlogdump/rmgrdesc.c                  |   1 +
 src/include/access/rmgrlist.h                   |   1 +
 src/include/catalog/pg_proc.h                   |   4 +
 src/include/replication/logicalfuncs.h          |   2 +
 src/include/replication/message.h               |  42 ++++++++
 src/include/replication/output_plugin.h         |  13 +++
 src/include/replication/reorderbuffer.h         |  21 ++++
 20 files changed, 514 insertions(+), 5 deletions(-)
 create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c
 create mode 100644 src/backend/replication/logical/message.c
 create mode 100644 src/include/replication/message.h

diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index a362e69..8fdcfbc 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -38,7 +38,7 @@ submake-test_decoding:
 	$(MAKE) -C $(top_builddir)/contrib/test_decoding
 
 REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
-	binary prepared replorigin
+	binary prepared replorigin messages
 
 regresscheck: | submake-regress submake-test_decoding temp-install
 	$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 4cf808f..f655355 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
 
 #include "replication/output_plugin.h"
 #include "replication/logical.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 
 #include "utils/builtins.h"
@@ -63,11 +64,15 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
 				 ReorderBufferChange *change);
 static bool pg_decode_filter(LogicalDecodingContext *ctx,
 				 RepOriginId origin_id);
+static void pg_decode_message(LogicalDecodingContext *ctx,
+							  ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+							  bool transactional, const char *prefix,
+							  Size sz, const char *message);
 
 void
 _PG_init(void)
 {
-	/* other plugins can perform things here */
+	RegisterLogicalMsgPrefix("test");
 }
 
 /* specify output plugin callbacks */
@@ -82,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
 	cb->commit_cb = pg_decode_commit_txn;
 	cb->filter_by_origin_cb = pg_decode_filter;
 	cb->shutdown_cb = pg_decode_shutdown;
+	cb->message_cb = pg_decode_message;
 }
 
 
@@ -471,3 +477,17 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
 
 	OutputPluginWrite(ctx, true);
 }
+
+static void
+pg_decode_message(LogicalDecodingContext *ctx,
+				  ReorderBufferTXN *txn, XLogRecPtr lsn,
+				  bool transactional, const char *prefix,
+				  Size sz, const char *message)
+{
+	OutputPluginPrepareWrite(ctx, true);
+	appendStringInfo(ctx->out, "message: lsn: %X/%X transactional: %d prefix: %s, sz: %zu content:",
+					 (uint32)(lsn >> 32), (uint32)lsn, transactional, prefix,
+					 sz);
+	appendBinaryStringInfo(ctx->out, message, sz);
+	OutputPluginWrite(ctx, true);
+}
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 4d2b88f..d55a2b1 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -17639,6 +17639,48 @@ postgres=# SELECT * FROM pg_xlogfile_name_offset(pg_stop_backup());
        </entry>
       </row>
 
+      <row>
+       <entry id="pg-logical-send-message-text">
+        <indexterm>
+         <primary>pg_logical_send_message</primary>
+        </indexterm>
+        <literal><function>pg_logical_send_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Write text logical decoding message. This can be used to pass generic
+        messages to logical decoding plugins through WAL. The parameter
+        <parameter>transactional</parameter> specifies if the message should
+        be part of current transaction or if it should be written and decoded
+        immediately. The <parameter>prefix</parameter> has to be prefix which
+        was registered by a plugin. The <parameter>content</parameter> is
+        content of the message.
+       </entry>
+      </row>
+
+      <row>
+       <entry id="pg-logical-send-message-bytea">
+        <indexterm>
+         <primary>pg_logical_send_message</primary>
+        </indexterm>
+        <literal><function>pg_logical_send_message(<parameter>transactional</parameter> <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> <type>bytea</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Write binary logical decoding message. This can be used to pass generic
+        messages to logical decoding plugins through WAL. The parameter
+        <parameter>transactional</parameter> specifies if the message should
+        be part of current transaction or if it should be written and decoded
+        immediately. The <parameter>prefix</parameter> has to be prefix which
+        was registered by a plugin. The <parameter>content</parameter> is
+        content of the message.
+       </entry>
+      </row>
+
      </tbody>
     </tgroup>
    </table>
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index 1ae5eb6..1d28de0 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -363,6 +363,7 @@ typedef struct OutputPluginCallbacks
     LogicalDecodeBeginCB begin_cb;
     LogicalDecodeChangeCB change_cb;
     LogicalDecodeCommitCB commit_cb;
+    LogicalDecodeMessageCB message_cb;
     LogicalDecodeFilterByOriginCB filter_by_origin_cb;
     LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
@@ -602,6 +603,27 @@ typedef bool (*LogicalDecodeFilterByOriginCB) (
        more efficient.
      </para>
      </sect3>
+
+    <sect3 id="logicaldecoding-output-plugin-message">
+     <title>Generic Message Callback</title>
+
+     <para>
+      The optional <function>message_cb</function> callback is called whenever
+      a logical decoding message has been decoded.
+<programlisting>
+typedef void (*LogicalDecodeMessageCB) (
+    struct LogicalDecodingContext *,
+    ReorderBufferTXN *txn,
+    XLogRecPtr message_lsn,
+    bool transactional,
+    const char *prefix,
+    Size message_size,
+    const char *message
+);
+</programlisting>
+     </para>
+    </sect3>
+
    </sect2>
 
    <sect2 id="logicaldecoding-output-plugin-output">
diff --git a/src/backend/access/rmgrdesc/Makefile b/src/backend/access/rmgrdesc/Makefile
index c72a1f2..723b4d8 100644
--- a/src/backend/access/rmgrdesc/Makefile
+++ b/src/backend/access/rmgrdesc/Makefile
@@ -9,8 +9,8 @@ top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
 OBJS = brindesc.o clogdesc.o committsdesc.o dbasedesc.o gindesc.o gistdesc.o \
-	   hashdesc.o heapdesc.o mxactdesc.o nbtdesc.o relmapdesc.o \
-	   replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
+	   hashdesc.o heapdesc.o logicalmsgdesc.o mxactdesc.o nbtdesc.o \
+	   relmapdesc.o replorigindesc.o seqdesc.o smgrdesc.o spgdesc.o \
 	   standbydesc.o tblspcdesc.o xactdesc.o xlogdesc.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/access/rmgrdesc/logicalmsgdesc.c b/src/backend/access/rmgrdesc/logicalmsgdesc.c
new file mode 100644
index 0000000..739ee10
--- /dev/null
+++ b/src/backend/access/rmgrdesc/logicalmsgdesc.c
@@ -0,0 +1,33 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalmsgdesc.c
+ *	  rmgr descriptor routines for replication/logical/message.c
+ *
+ * Portions Copyright (c) 2015-2016, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/access/rmgrdesc/logicalmsgdesc.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "replication/message.h"
+
+void
+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+	char			   *rec = XLogRecGetData(record);
+	xl_logical_message *xlrec = (xl_logical_message *) rec;
+
+	appendStringInfo(buf, "%s message size %zu bytes",
+					 xlrec->transactional ? "transactional" : "nontransactional",
+					 xlrec->message_size);
+}
+
+const char *
+logicalmsg_identify(uint8 info)
+{
+	return NULL;
+}
diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index 7c4d773..1a42121 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -23,6 +23,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
 #include "utils/relmapper.h"
diff --git a/src/backend/replication/logical/Makefile b/src/backend/replication/logical/Makefile
index 8adea13..1d7ca06 100644
--- a/src/backend/replication/logical/Makefile
+++ b/src/backend/replication/logical/Makefile
@@ -14,7 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 override CPPFLAGS := -I$(srcdir) $(CPPFLAGS)
 
-OBJS = decode.o logical.o logicalfuncs.o reorderbuffer.o origin.o \
+OBJS = decode.o logical.o logicalfuncs.o message.o origin.o reorderbuffer.o \
 	snapbuild.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 88c3a49..110f958 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -39,6 +39,7 @@
 
 #include "replication/decode.h"
 #include "replication/logical.h"
+#include "replication/message.h"
 #include "replication/reorderbuffer.h"
 #include "replication/origin.h"
 #include "replication/snapbuild.h"
@@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
 
 /* individual record(group)'s handlers */
 static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -115,6 +117,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 			DecodeHeapOp(ctx, &buf);
 			break;
 
+		case RM_LOGICALMSG_ID:
+			DecodeLogicalMsgOp(ctx, &buf);
+			break;
+
 			/*
 			 * Rmgrs irrelevant for logical decoding; they describe stuff not
 			 * represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -431,6 +437,37 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 	}
 }
 
+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+	SnapBuild  *builder = ctx->snapshot_builder;
+	XLogReaderState *r = buf->record;
+	uint8		info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+	xl_logical_message *message;
+
+	if (info != XLOG_LOGICAL_MESSAGE)
+		elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+
+	message = (xl_logical_message *) XLogRecGetData(r);
+
+	if (message->transactional &&
+		!SnapBuildProcessChange(builder, XLogRecGetXid(r), buf->origptr))
+		return;
+	else if(!message->transactional &&
+			(SnapBuildCurrentState(ctx->snapshot_builder) != SNAPBUILD_CONSISTENT ||
+			 SnapBuildXactNeedsSkip(builder, buf->origptr)))
+		return;
+
+	ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r),
+							  buf->endptr, message->transactional,
+							  message->message, /* first part of message is prefix */
+							  message->message_size,
+							  message->message + message->prefix_size);
+}
+
 static inline bool
 FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
 {
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 2e6d3f9..998fb79 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -62,6 +62,9 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  XLogRecPtr commit_lsn);
 static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
 				  Relation relation, ReorderBufferChange *change);
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+				  XLogRecPtr message_lsn, bool transactional,
+				  const char *prefix, Size sz, const char *message);
 
 static void LoadOutputPlugin(OutputPluginCallbacks *callbacks, char *plugin);
 
@@ -178,6 +181,7 @@ StartupDecodingContext(List *output_plugin_options,
 	ctx->reorder->begin = begin_cb_wrapper;
 	ctx->reorder->apply_change = change_cb_wrapper;
 	ctx->reorder->commit = commit_cb_wrapper;
+	ctx->reorder->message = message_cb_wrapper;
 
 	ctx->out = makeStringInfo();
 	ctx->prepare_write = prepare_write;
@@ -702,6 +706,40 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 	return ret;
 }
 
+static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+							   XLogRecPtr message_lsn,
+							   bool transactional, const char *prefix,
+							   Size sz, const char *message)
+{
+	LogicalDecodingContext *ctx = cache->private_data;
+	LogicalErrorCallbackState state;
+	ErrorContextCallback errcallback;
+
+	if (ctx->callbacks.message_cb == NULL)
+		return;
+
+	/* Push callback + info on the error context stack */
+	state.ctx = ctx;
+	state.callback_name = "message";
+	state.report_location = message_lsn; /* beginning of commit record */
+	errcallback.callback = output_plugin_error_callback;
+	errcallback.arg = (void *) &state;
+	errcallback.previous = error_context_stack;
+	error_context_stack = &errcallback;
+
+	/* set output state */
+	ctx->accept_writes = true;
+	ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
+	ctx->write_location = message_lsn;
+
+	/* do the actual work: call callback */
+	ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional,
+							  prefix, sz, message);
+
+	/* Pop the error context stack */
+	error_context_stack = errcallback.previous;
+}
+
 /*
  * Set the required catalog xmin horizon for historic snapshots in the current
  * replication slot.
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index f789fc1..2eb26d4 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -24,6 +24,8 @@
 #include "access/xlog_internal.h"
 #include "access/xlogutils.h"
 
+#include "access/xact.h"
+
 #include "catalog/pg_type.h"
 
 #include "nodes/makefuncs.h"
@@ -41,6 +43,7 @@
 #include "replication/decode.h"
 #include "replication/logical.h"
 #include "replication/logicalfuncs.h"
+#include "replication/message.h"
 
 #include "storage/fd.h"
 
@@ -363,3 +366,27 @@ pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS)
 {
 	return pg_logical_slot_get_changes_guts(fcinfo, false, true);
 }
+
+
+/*
+ * SQL function returning the changestream in binary, only peeking ahead.
+ */
+Datum
+pg_logical_send_message_bytea(PG_FUNCTION_ARGS)
+{
+	bool		transactional = PG_GETARG_BOOL(0);
+	char	   *prefix = text_to_cstring(PG_GETARG_TEXT_PP(1));
+	bytea	   *data = PG_GETARG_BYTEA_PP(2);
+	XLogRecPtr	lsn;
+
+	lsn = LogLogicalMessage(prefix, VARDATA_ANY(data), VARSIZE_ANY_EXHDR(data),
+							transactional);
+	PG_RETURN_LSN(lsn);
+}
+
+Datum
+pg_logical_send_message_text(PG_FUNCTION_ARGS)
+{
+	/* bytea and text are compatible */
+	return pg_logical_send_message_bytea(fcinfo);
+}
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
new file mode 100644
index 0000000..e5f4feb
--- /dev/null
+++ b/src/backend/replication/logical/message.c
@@ -0,0 +1,132 @@
+/*-------------------------------------------------------------------------
+ *
+ * message.c
+ *	  Generic logical messages.
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/logical/message.c
+ *
+ * NOTES
+ *
+ * Generic logical messages allow XLOG logging of arbitrary binary blobs that
+ * get passed to the logical decoding plugin. In normal XLOG processing they
+ * are same as NOOP.
+ *
+ * These messages can be either transactional or non-transactional.
+ * Transactional messages are part of current transaction and will be sent to
+ * decoding plugin using in a same way as DML operations.
+ * Non-transactional messages are sent to the plugin at the time when the
+ * logical decoding reads them from XLOG.
+ *
+ * Every message carries prefix to avoid conflicts between different decoding
+ * plugins. The prefix has to be registered before the message using that
+ * prefix can be written to XLOG. The prefix can be registered exactly once to
+ * avoid situation where multiple third party extensions try to use same
+ * prefix.
+ *
+ * ---------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xact.h"
+
+#include "catalog/indexing.h"
+
+#include "nodes/execnodes.h"
+
+#include "replication/message.h"
+#include "replication/logical.h"
+
+#include "utils/memutils.h"
+
+/* List of registered logical message prefixes. */
+static List	   *LogicalMsgPrefixList = NIL;
+
+XLogRecPtr
+LogLogicalMessage(const char *prefix, const char *message, size_t size,
+				  bool transactional)
+{
+	ListCell		   *lc;
+	bool				found = false;
+	xl_logical_message	xlrec;
+
+	/* Check if the provided prefix is known to us. */
+	foreach(lc, LogicalMsgPrefixList)
+	{
+		char	*mp = lfirst(lc);
+
+		if (strcmp(prefix, mp) == 0)
+		{
+			found = true;
+			break;
+		}
+	}
+	if (!found)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("standby message prefix \"%s\" is not registered",
+						prefix)));
+
+	/*
+	 * Force xid to be allocated if we're sending a transactional message.
+	 */
+	if (transactional)
+	{
+		Assert(IsTransactionState());
+		GetCurrentTransactionId();
+	}
+
+	xlrec.transactional = transactional;
+	xlrec.prefix_size = strlen(prefix) + 1;
+	xlrec.message_size = size;
+
+	XLogBeginInsert();
+	XLogRegisterData((char *) &xlrec, SizeOfLogicalMessage);
+	XLogRegisterData((char *) prefix, xlrec.prefix_size);
+	XLogRegisterData((char *) message, size);
+
+	return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
+}
+
+void
+RegisterLogicalMsgPrefix(const char *prefix)
+{
+	ListCell	   *lc;
+	bool			found = false;
+	MemoryContext	oldcxt;
+
+	/* Check for duplicit registrations. */
+	foreach(lc, LogicalMsgPrefixList)
+	{
+		char	*mp = lfirst(lc);
+
+		if (strcmp(prefix, mp) == 0)
+		{
+			found = true;
+			break;
+		}
+	}
+	if (found)
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("logical decoding message prefix \"%s\" is already registered",
+						prefix)));
+
+	oldcxt = MemoryContextSwitchTo(TopMemoryContext);
+	LogicalMsgPrefixList = lappend(LogicalMsgPrefixList, pstrdup(prefix));
+	MemoryContextSwitchTo(oldcxt);
+}
+
+void
+logicalmsg_redo(XLogReaderState *record)
+{
+	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
+
+	if (info != XLOG_LOGICAL_MESSAGE)
+			elog(PANIC, "logicalmsg_redo: unknown op code %u", info);
+
+	/* This is only interesting for logical decoding, see decode.c. */
+}
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 78acced..99e9ec7 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -414,6 +414,11 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
 				change->data.tp.oldtuple = NULL;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			if (change->data.msg.message != NULL)
+				pfree(change->data.msg.message);
+			change->data.msg.message = NULL;
+			break;
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			if (change->data.snapshot)
 			{
@@ -599,6 +604,39 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
 	ReorderBufferCheckSerializeTXN(rb, txn);
 }
 
+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+						  bool transactional, const char *prefix, Size msg_sz,
+						  const char *msg)
+{
+	ReorderBufferTXN *txn = NULL;
+
+	if (transactional)
+	{
+		ReorderBufferChange *change;
+
+		txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+		Assert(xid != InvalidTransactionId);
+		Assert(txn != NULL);
+
+		change = ReorderBufferGetChange(rb);
+		change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+		change->data.msg.transactional = true;
+		change->data.msg.prefix = pstrdup(prefix);
+		change->data.msg.message_size = msg_sz;
+		change->data.msg.message = palloc(msg_sz);
+		memcpy(change->data.msg.message, msg, msg_sz);
+
+		ReorderBufferQueueChange(rb, xid, lsn, change);
+	}
+	else
+	{
+		rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
+	}
+}
+
+
 static void
 AssertTXNLsnOrder(ReorderBuffer *rb)
 {
@@ -1465,6 +1503,14 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
 					specinsert = change;
 					break;
 
+				case REORDER_BUFFER_CHANGE_MESSAGE:
+					rb->message(rb, txn, change->lsn,
+								change->data.msg.transactional,
+								change->data.msg.prefix,
+								change->data.msg.message_size,
+								change->data.msg.message);
+					break;
+
 				case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 					/* get rid of the old */
 					TeardownHistoricSnapshot(false);
@@ -2117,6 +2163,21 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				}
 				break;
 			}
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			{
+				char	   *data;
+				size_t		prefix_size = strlen(change->data.msg.prefix) + 1;
+
+				sz += prefix_size + change->data.msg.message_size;
+				ReorderBufferSerializeReserve(rb, sz);
+
+				data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
+				memcpy(data, change->data.msg.prefix,
+					   prefix_size);
+				memcpy(data + prefix_size, change->data.msg.message,
+					   change->data.msg.message_size);
+				break;
+			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			{
 				Snapshot	snap;
@@ -2354,6 +2415,18 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 				data += len;
 			}
 			break;
+		case REORDER_BUFFER_CHANGE_MESSAGE:
+			{
+				Size		message_size = change->data.msg.message_size;
+				Size		prefix_size = strlen(data) + 1;
+
+				change->data.msg.prefix = pstrdup(data);
+				change->data.msg.message = palloc(message_size);
+				memcpy(change->data.msg.message, data + prefix_size,
+					   message_size);
+
+				data += prefix_size + message_size;
+			}
 		case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
 			{
 				Snapshot	oldsnap;
diff --git a/src/bin/pg_xlogdump/rmgrdesc.c b/src/bin/pg_xlogdump/rmgrdesc.c
index f9cd395..6ba7f22 100644
--- a/src/bin/pg_xlogdump/rmgrdesc.c
+++ b/src/bin/pg_xlogdump/rmgrdesc.c
@@ -25,6 +25,7 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "replication/message.h"
 #include "replication/origin.h"
 #include "rmgrdesc.h"
 #include "storage/standbydefs.h"
diff --git a/src/include/access/rmgrlist.h b/src/include/access/rmgrlist.h
index fab912d..35c242d 100644
--- a/src/include/access/rmgrlist.h
+++ b/src/include/access/rmgrlist.h
@@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, spg_identify, spg_xlog_start
 PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL)
 PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, commit_ts_identify, NULL, NULL)
 PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, replorigin_desc, replorigin_identify, NULL, NULL)
+PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, logicalmsg_desc, logicalmsg_identify, NULL, NULL)
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index 244aa4d..8da6fae 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5066,6 +5066,10 @@ DATA(insert OID = 3784 (  pg_logical_slot_peek_changes PGNSP PGUID 12 1000 1000
 DESCR("peek at changes from replication slot");
 DATA(insert OID = 3785 (  pg_logical_slot_peek_binary_changes PGNSP PGUID 12 1000 1000 25 0 f f f f f t v u 4 0 2249 "19 3220 23 1009" "{19,3220,23,1009,3220,28,17}" "{i,i,i,v,o,o,o}" "{slot_name,upto_lsn,upto_nchanges,options,location,xid,data}" _null_ _null_ pg_logical_slot_peek_binary_changes _null_ _null_ _null_ ));
 DESCR("peek at binary changes from replication slot");
+DATA(insert OID = 3577 (  pg_logical_send_message PGNSP PGUID 12 1 0 0 0 f f f f f f v u 3 0 3220 "16 25 25" _null_ _null_ _null_ _null_ _null_ pg_logical_send_message_text _null_ _null_ _null_ ));
+DESCR("send a textual message");
+DATA(insert OID = 3578 (  pg_logical_send_message PGNSP PGUID 12 1 0 0 0 f f f f f f v u 3 0 3220 "16 25 17" _null_ _null_ _null_ _null_ _null_ pg_logical_send_message_bytea _null_ _null_ _null_ ));
+DESCR("send a binary message");
 
 /* event triggers */
 DATA(insert OID = 3566 (  pg_event_trigger_dropped_objects		PGNSP PGUID 12 10 100 0 0 f f f f t t s s 0 0 2249 "" "{26,26,23,16,16,16,25,25,25,25,1009,1009}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{classid, objid, objsubid, original, normal, is_temporary, object_type, schema_name, object_name, object_identity, address_names, address_args}" _null_ _null_ pg_event_trigger_dropped_objects _null_ _null_ _null_ ));
diff --git a/src/include/replication/logicalfuncs.h b/src/include/replication/logicalfuncs.h
index c87a1df..cd3373e 100644
--- a/src/include/replication/logicalfuncs.h
+++ b/src/include/replication/logicalfuncs.h
@@ -21,4 +21,6 @@ extern Datum pg_logical_slot_get_binary_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_changes(PG_FUNCTION_ARGS);
 extern Datum pg_logical_slot_peek_binary_changes(PG_FUNCTION_ARGS);
 
+extern Datum pg_logical_send_message_bytea(PG_FUNCTION_ARGS);
+extern Datum pg_logical_send_message_text(PG_FUNCTION_ARGS);
 #endif
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
new file mode 100644
index 0000000..e4223f4
--- /dev/null
+++ b/src/include/replication/message.h
@@ -0,0 +1,42 @@
+/*-------------------------------------------------------------------------
+ * message.h
+ *	   Exports from replication/logical/message.c
+ *
+ * Copyright (c) 2013-2016, PostgreSQL Global Development Group
+ *
+ * src/include/replication/message.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_LOGICAL_MESSAGE_H
+#define PG_LOGICAL_MESSAGE_H
+
+#include "access/xlog.h"
+#include "access/xlogdefs.h"
+#include "access/xlogreader.h"
+
+/*
+ * Generic logical decoding message wal record.
+ */
+typedef struct xl_logical_message
+{
+	bool		transactional;					/* is message transactional? */
+	size_t		prefix_size;					/* length of prefix */
+	size_t		message_size;					/* size of the message */
+	char		message[FLEXIBLE_ARRAY_MEMBER];	/* message including the null
+												 * terminated prefx of length
+												 * prefix_size */
+} xl_logical_message;
+
+#define SizeOfLogicalMessage	(offsetof(xl_logical_message, message))
+
+extern XLogRecPtr LogLogicalMessage(const char *prefix, const char *message,
+									size_t size, bool transactional);
+extern void RegisterLogicalMsgPrefix(const char *prefix);
+
+/* RMGR API*/
+#define XLOG_LOGICAL_MESSAGE	0x00
+void		logicalmsg_redo(XLogReaderState *record);
+void		logicalmsg_desc(StringInfo buf, XLogReaderState *record);
+const char *logicalmsg_identify(uint8 info);
+
+#endif   /* PG_LOGICAL_MESSAGE_H */
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 577b12e..3a2ca98 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -74,6 +74,18 @@ typedef void (*LogicalDecodeCommitCB) (
 												   XLogRecPtr commit_lsn);
 
 /*
+ * Called for the generic logical decoding messages.
+ */
+typedef void (*LogicalDecodeMessageCB) (
+											 struct LogicalDecodingContext *,
+											 ReorderBufferTXN *txn,
+											 XLogRecPtr message_lsn,
+											 bool transactional,
+											 const char *prefix,
+											 Size message_size,
+											 const char *message);
+
+/*
  * Filter changes by origin.
  */
 typedef bool (*LogicalDecodeFilterByOriginCB) (
@@ -96,6 +108,7 @@ typedef struct OutputPluginCallbacks
 	LogicalDecodeBeginCB begin_cb;
 	LogicalDecodeChangeCB change_cb;
 	LogicalDecodeCommitCB commit_cb;
+	LogicalDecodeMessageCB message_cb;
 	LogicalDecodeFilterByOriginCB filter_by_origin_cb;
 	LogicalDecodeShutdownCB shutdown_cb;
 } OutputPluginCallbacks;
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 2abee0a..f51f5c1 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -51,6 +51,7 @@ enum ReorderBufferChangeType
 	REORDER_BUFFER_CHANGE_INSERT,
 	REORDER_BUFFER_CHANGE_UPDATE,
 	REORDER_BUFFER_CHANGE_DELETE,
+	REORDER_BUFFER_CHANGE_MESSAGE,
 	REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT,
 	REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID,
 	REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID,
@@ -95,6 +96,14 @@ typedef struct ReorderBufferChange
 			ReorderBufferTupleBuf *newtuple;
 		}			tp;
 
+		struct
+		{
+			bool transactional;
+			char *prefix;
+			size_t message_size;
+			char *message;
+		} msg;
+
 		/* New snapshot, set when action == *_INTERNAL_SNAPSHOT */
 		Snapshot	snapshot;
 
@@ -271,6 +280,15 @@ typedef void (*ReorderBufferCommitCB) (
 												   ReorderBufferTXN *txn,
 												   XLogRecPtr commit_lsn);
 
+/* message callback signature */
+typedef void (*ReorderBufferMessageCB) (
+												   ReorderBuffer *rb,
+												   ReorderBufferTXN *txn,
+												   XLogRecPtr message_lsn,
+												   bool transactional,
+												   const char *prefix, Size sz,
+												   const char *message);
+
 struct ReorderBuffer
 {
 	/*
@@ -297,6 +315,7 @@ struct ReorderBuffer
 	ReorderBufferBeginCB begin;
 	ReorderBufferApplyChangeCB apply_change;
 	ReorderBufferCommitCB commit;
+	ReorderBufferMessageCB message;
 
 	/*
 	 * Pointer that will be passed untouched to the callbacks.
@@ -347,6 +366,8 @@ ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
 void		ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
 
 void		ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void		ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
+									  bool transactional, const char *prefix, Size msg_sz, const char *msg);
 void ReorderBufferCommit(ReorderBuffer *, TransactionId,
 					XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
 	  TimestampTz commit_time, RepOriginId origin_id, XLogRecPtr origin_lsn);
-- 
1.9.1

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to