On 2016-03-21 18:10:55 +0100, Petr Jelinek wrote: > Just noticed there is missing symlink in the pg_xlogdump.
> create mode 100644 src/backend/access/rmgrdesc/logicalmsgdesc.c > create mode 120000 src/bin/pg_xlogdump/logicalmsgdesc.c Uh, src/bin/pg_xlogdump/logicalmsgdesc.c shouldn't be there. The symlink is supposed to be automatically created by the Makefile. Were you perhaps confused because it showed up in git status? If so, that's probably because it isn't in src/bin/pg_xlogdump/.gitignore. Perhaps we should change that file to ignore *desc.c? > + <row> > + <entry id="pg-logical-emit-message-text"> > + <indexterm> > + <primary>pg_logical_emit_message</primary> > + </indexterm> > + > <literal><function>pg_logical_emit_message(<parameter>transactional</parameter> > <type>bool</type>, <parameter>prefix</parameter> <type>text</type>, > <parameter>content</parameter> <type>text</type>)</function></literal> > + </entry> > + <entry> > + void > + </entry> > + <entry> > + Write text logical decoding message. This can be used to pass generic > + messages to logical decoding plugins through WAL. The parameter > + <parameter>transactional</parameter> specifies if the message should > + be part of current transaction or if it should be written immediately > + and decoded as soon as the logical decoding reads the record. The > + <parameter>prefix</parameter> is textual prefix used by the logical > + decoding plugins to easily recognize interesting messages for them. > + The <parameter>content</parameter> is the text of the message. > + </entry> > + </row> s/write/emit/? > + > + <sect3 id="logicaldecoding-output-plugin-message"> > + <title>Generic Message Callback</title> > + > + <para> > + The optional <function>message_cb</function> callback is called > whenever > + a logical decoding message has been decoded. > +<programlisting> > +typedef void (*LogicalDecodeMessageCB) ( > + struct LogicalDecodingContext *, > + ReorderBufferTXN *txn, > + XLogRecPtr message_lsn, > + const char *prefix, > + Size message_size, > + const char *message > +); I see you removed the transactional parameter. I'm doubtful that that's a good idea: It seems like it'd be rather helpful to pass the transaction for a nontransaction message that's emitted while an xid was assigned? > +/* > + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). > + */ > +static void > +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) > +{ > + SnapBuild *builder = ctx->snapshot_builder; > + XLogReaderState *r = buf->record; > + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; > + xl_logical_message *message; > + > + if (info != XLOG_LOGICAL_MESSAGE) > + elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", > info); > + > + message = (xl_logical_message *) XLogRecGetData(r); > + > + if (message->transactional) > + { > + if (!SnapBuildProcessChange(builder, XLogRecGetXid(r), > buf->origptr)) > + return; > + > + ReorderBufferQueueMessage(ctx->reorder, XLogRecGetXid(r), > + buf->endptr, > + > message->message, /* first part of message is prefix */ > + > message->message_size, > + > message->message + message->prefix_size); > + } > + else if (SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT && > + !SnapBuildXactNeedsSkip(builder, buf->origptr)) > + { > + volatile Snapshot snapshot_now; > + ReorderBuffer *rb = ctx->reorder; > + > + /* setup snapshot to allow catalog access */ > + snapshot_now = SnapBuildGetOrBuildSnapshot(builder, > XLogRecGetXid(r)); > + SetupHistoricSnapshot(snapshot_now, NULL); > + rb->message(rb, NULL, buf->origptr, message->message, > + message->message_size, > + message->message + > message->prefix_size); > + TeardownHistoricSnapshot(false); > + } > +} A number of things: 1) The SnapBuildProcessChange needs to be toplevel, not just for transactional messages - we can't yet necessarily build a snapshot. 2) I'm inclined to move even the non-transactional stuff to reorderbuffer. 3) This lacks error handling, we surely don't want to error out while still having the historic snapshot setup 4) Without 3) the volatile is bogus. 5) Misses a ReorderBufferProcessXid() call. > + * Every message carries prefix to avoid conflicts between different decoding > + * plugins. The prefix has to be registered before the message using that > + * prefix can be written to XLOG. The prefix can be registered exactly once > to > + * avoid situation where multiple third party extensions try to use same > + * prefix. Outdated afaics? > @@ -414,6 +414,14 @@ ReorderBufferReturnChange(ReorderBuffer *rb, > ReorderBufferChange *change) > change->data.tp.oldtuple = NULL; > } > break; > + case REORDER_BUFFER_CHANGE_MESSAGE: > + if (change->data.msg.prefix != NULL) > + pfree(change->data.msg.prefix); > + change->data.msg.prefix = NULL; > + if (change->data.msg.message != NULL) > + pfree(change->data.msg.message); > + change->data.msg.message = NULL; > + break; Hm, this will have some overhead, but I guess the messages won't be super frequent, and usually not very large. > +/* > + * Queue message into a transaction so it can be processed upon commit. > + */ > +void > +ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr > lsn, > + const char *prefix, Size > msg_sz, const char *msg) > +{ > + ReorderBufferChange *change; > + > + Assert(xid != InvalidTransactionId); > + > + change = ReorderBufferGetChange(rb); > + change->action = REORDER_BUFFER_CHANGE_MESSAGE; > + change->data.msg.prefix = pstrdup(prefix); > + change->data.msg.message_size = msg_sz; > + change->data.msg.message = palloc(msg_sz); > + memcpy(change->data.msg.message, msg, msg_sz); > + > + ReorderBufferQueueChange(rb, xid, lsn, change); > +} I'm not sure right now if there's any guarantee that the current memory context is meaningful here? IIRC other long-lived allocations explicitly use a context? > + case REORDER_BUFFER_CHANGE_MESSAGE: > + { > + char *data; > + size_t prefix_size = > strlen(change->data.msg.prefix) + 1; > + > + sz += prefix_size + > change->data.msg.message_size; > + ReorderBufferSerializeReserve(rb, sz); > + > + data = ((char *) rb->outbuf) + > sizeof(ReorderBufferDiskChange); > + memcpy(data, change->data.msg.prefix, > + prefix_size); > + memcpy(data + prefix_size, > change->data.msg.message, > + change->data.msg.message_size); > + break; > + } Can you please include the sizes of the blocks explicitly, rather than relying on 0 termination? > @@ -45,3 +45,4 @@ PG_RMGR(RM_SPGIST_ID, "SPGist", spg_redo, spg_desc, > spg_identify, spg_xlog_start > PG_RMGR(RM_BRIN_ID, "BRIN", brin_redo, brin_desc, brin_identify, NULL, NULL) > PG_RMGR(RM_COMMIT_TS_ID, "CommitTs", commit_ts_redo, commit_ts_desc, > commit_ts_identify, NULL, NULL) > PG_RMGR(RM_REPLORIGIN_ID, "ReplicationOrigin", replorigin_redo, > replorigin_desc, replorigin_identify, NULL, NULL) > +PG_RMGR(RM_LOGICALMSG_ID, "LogicalMessage", logicalmsg_redo, > logicalmsg_desc, logicalmsg_identify, NULL, NULL) Did you consider doing this via the standby rmgr instead? > +typedef struct xl_logical_message > +{ > + bool transactional; /* is > message transactional? */ > + size_t prefix_size; /* > length of prefix */ > + size_t message_size; /* size > of the message */ > + char message[FLEXIBLE_ARRAY_MEMBER]; /* message including > the null > + > * terminated prefx of length > + > * prefix_size */ > +} xl_logical_message; > "prefx". Greetings, Andres Freund -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers