Hi,

thanks for looking Andres,


On 27/02/16 01:05, Andres Freund wrote:
Hi,

I'm not really convinced by RegisterStandbyMsgPrefix() et al. There's
not much documentation about what it actually is supposed to
acomplish. Afaics you're basically forced to use
shared_preload_libraries with it right now?  Also, iterating through a
linked list everytime something is logged doesn't seem very satisfying?


Well, my reasoning there was to stop multiple plugins from using same prefix and for that you need some kind of registry. Making this a shared catalog seemed like huge overkill given the potentially transient nature of output plugins and this was the best I could come up with. And yes it requires you to load your plugin before you can log a message for it.

I am not married to this solution so if you have better ideas or if you think the clashes are not read issue, we can change it.


On 2016-02-24 18:35:16 +0100, Petr Jelinek wrote:
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 
'test_decoding');
+ ?column?
+----------
+ init
+(1 row)

+SELECT 'msg1' FROM pg_logical_send_message(true, 'test', 'msg1');
+ ?column?
+----------
+ msg1
+(1 row)

Hm. Somehow 'sending' a message seems wrong here. Maybe 'emit'?

+      <row>
+       <entry id="pg-logical-send-message-text">
+        <indexterm>
+         <primary>pg_logical_send_message</primary>
+        </indexterm>
+        <literal><function>pg_logical_send_message(<parameter>transactional</parameter> <type>bool</type>, 
<parameter>prefix</parameter> <type>text</type>, <parameter>content</parameter> 
<type>text</type>)</function></literal>
+       </entry>
+       <entry>
+        void
+       </entry>
+       <entry>
+        Write text logical decoding message. This can be used to pass generic
+        messages to logical decoding plugins through WAL. The parameter
+        <parameter>transactional</parameter> specifies if the message should
+        be part of current transaction or if it should be written and decoded
+        immediately. The <parameter>prefix</parameter> has to be prefix which
+        was registered by a plugin. The <parameter>content</parameter> is
+        content of the message.
+       </entry>
+      </row>

It's not decoded immediately, even if emitted non-transactionally.


Okay, immediately is somewhat misleading. How does "should be written immediately and decoded as soon as logical decoding reads the WAL record" sound ?

+    <sect3 id="logicaldecoding-output-plugin-message">
+     <title>Generic Message Callback</title>
+
+     <para>
+      The optional <function>message_cb</function> callback is called whenever
+      a logical decoding message has been decoded.
+<programlisting>
+typedef void (*LogicalDecodeMessageCB) (
+    struct LogicalDecodingContext *,
+    ReorderBufferTXN *txn,
+    XLogRecPtr message_lsn,
+    bool transactional,
+    const char *prefix,
+    Size message_size,
+    const char *message
+);

We should at least document what txn is set to if not transactional.


Will do (it's NULL).

+void
+logicalmsg_desc(StringInfo buf, XLogReaderState *record)
+{
+       char                       *rec = XLogRecGetData(record);
+       xl_logical_message *xlrec = (xl_logical_message *) rec;
+
+       appendStringInfo(buf, "%s message size %zu bytes",
+                                        xlrec->transactional ? "transactional" : 
"nontransactional",
+                                        xlrec->message_size);
+}

Shouldn't we check
           uint8         info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
           if XLogRecGetInfo(record) == XLOG_LOGICAL_MESSAGE
here?


I thought it's kinda pointless, but we seem to be doing it in other places so will add.


+void
+ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
+                                                 bool transactional, const 
char *prefix, Size msg_sz,
+                                                 const char *msg)
+{
+       ReorderBufferTXN *txn = NULL;
+
+       if (transactional)
+       {
+               ReorderBufferChange *change;
+
+               txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
+
+               Assert(xid != InvalidTransactionId);
+               Assert(txn != NULL);
+
+               change = ReorderBufferGetChange(rb);
+               change->action = REORDER_BUFFER_CHANGE_MESSAGE;
+               change->data.msg.transactional = true;
+               change->data.msg.prefix = pstrdup(prefix);
+               change->data.msg.message_size = msg_sz;
+               change->data.msg.message = palloc(msg_sz);
+               memcpy(change->data.msg.message, msg, msg_sz);
+
+               ReorderBufferQueueChange(rb, xid, lsn, change);
+       }
+       else
+       {
+               rb->message(rb, txn, lsn, transactional, prefix, msg_sz, msg);
+       }
+}


This approach prohibts catalog access when processing a nontransaction
message as there's no snapshot set up.


Hmm I do see usefulness in having snapshot, although I wonder if that does not kill the point of non-transactional messages. Question is then though which snapshot should the message see, base_snapshot of transaction? That would mean we'd have to call SnapBuildProcessChange for non-transactional messages which we currently avoid. Alternatively we could probably invent lighter version of that interface that would just make sure builder->snapshot is valid and if not then build it but I am honestly sure if that's a win or not.

--
  Petr Jelinek                  http://www.2ndQuadrant.com/
  PostgreSQL Development, 24x7 Support, Training & Services


--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to