On Thu, 26 Oct 2023 at 17:00, David Rowley <dgrowle...@gmail.com> wrote:
> Thanks for looking at this again. I fixed up each of those and pushed
> the result, mentioning the incompatibility in the commit message.
>
> Now that that's done, I've attached a patch which makes use of the new
> initReadOnlyStringInfo initializer function for the original case
> mentioned when I opened this thread. I don't think there are any
> remaining objections to this, but I'll let it sit for a bit to see.

I've just pushed the deserial function optimisation patch.

I was just looking at a few other places where we might want to make
use of initReadOnlyStringInfo.

* parallel.c in HandleParallelMessages():

Drilling into HandleParallelMessage(), I see the PqMsg_BackendKeyData
case just reads a fixed number of bytes.  In some of the other
"switch" cases, I see calls pq_getmsgrawstring() either directly or
indirectly.  I see the counterpart to pq_getmsgrawstring() is
pq_sendstring() which always appends the NUL char to the StringInfo,
so I don't think not NUL terminating the received bytes is a problem
as cstrings seem to be sent with the NUL terminator.

This case just seems to handle ERROR/NOTICE messages coming from
parallel workers. Not tuples themselves. It may not be that
interesting a case to speed up.

* applyparallelworker.c in HandleParallelApplyMessages():

Drilling into HandleParallelApplyMessage(), I don't see anything there
that needs the input StringInfo to be NUL terminated.

* worker.c in apply_spooled_messages():

Drilling into apply_dispatch() and going through each of the cases, I
see logicalrep_read_tuple() pallocs a new buffer and ensures it's
always NUL terminated which will be required in LOGICALREP_COLUMN_TEXT
mode. (There seems to be further optimisation opportunities there
where we could not do the palloc when in LOGICALREP_COLUMN_BINARY mode
and just point value's buffer directly to the correct portion of the
input StringInfo's buffer).

* walreceiver.c in XLogWalRcvProcessMsg():

Nothing there seems to require the incoming_message StringInfo to have
a NUL terminator.  I imagine this one is the most worthwhile to do out
of the 4.  I've not tested to see if there are any performance
improvements.

Does anyone see any reason why we can't do the attached?

David
diff --git a/src/backend/access/transam/parallel.c 
b/src/backend/access/transam/parallel.c
index 194a1207be..f3708005d2 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -1087,10 +1087,8 @@ HandleParallelMessages(void)
                                {
                                        StringInfoData msg;
 
-                                       initStringInfo(&msg);
-                                       appendBinaryStringInfo(&msg, data, 
nbytes);
+                                       initReadOnlyStringInfo(&msg, data, 
nbytes);
                                        HandleParallelMessage(pcxt, i, &msg);
-                                       pfree(msg.data);
                                }
                                else
                                        ereport(ERROR,
diff --git a/src/backend/replication/logical/applyparallelworker.c 
b/src/backend/replication/logical/applyparallelworker.c
index 9b37736f8e..fcc33bda10 100644
--- a/src/backend/replication/logical/applyparallelworker.c
+++ b/src/backend/replication/logical/applyparallelworker.c
@@ -1117,10 +1117,8 @@ HandleParallelApplyMessages(void)
                {
                        StringInfoData msg;
 
-                       initStringInfo(&msg);
-                       appendBinaryStringInfo(&msg, data, nbytes);
+                       initReadOnlyStringInfo(&msg, data, nbytes);
                        HandleParallelApplyMessage(&msg);
-                       pfree(msg.data);
                }
                else
                        ereport(ERROR,
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index ba67eb156f..52a9f136ab 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2019,7 +2019,6 @@ void
 apply_spooled_messages(FileSet *stream_fileset, TransactionId xid,
                                           XLogRecPtr lsn)
 {
-       StringInfoData s2;
        int                     nchanges;
        char            path[MAXPGPATH];
        char       *buffer = NULL;
@@ -2057,7 +2056,6 @@ apply_spooled_messages(FileSet *stream_fileset, 
TransactionId xid,
        CurrentResourceOwner = oldowner;
 
        buffer = palloc(BLCKSZ);
-       initStringInfo(&s2);
 
        MemoryContextSwitchTo(oldcxt);
 
@@ -2079,6 +2077,7 @@ apply_spooled_messages(FileSet *stream_fileset, 
TransactionId xid,
        nchanges = 0;
        while (true)
        {
+               StringInfoData s2;
                size_t          nbytes;
                int                     len;
 
@@ -2104,9 +2103,8 @@ apply_spooled_messages(FileSet *stream_fileset, 
TransactionId xid,
 
                BufFileTell(stream_fd, &fileno, &offset);
 
-               /* copy the buffer to the stringinfo and call apply_dispatch */
-               resetStringInfo(&s2);
-               appendBinaryStringInfo(&s2, buffer, len);
+               /* init a stringinfo using the buffer and call apply_dispatch */
+               initReadOnlyStringInfo(&s2, buffer, len);
 
                /* Ensure we are reading the data into our memory context. */
                oldcxt = MemoryContextSwitchTo(ApplyMessageContext);
diff --git a/src/backend/replication/walreceiver.c 
b/src/backend/replication/walreceiver.c
index a3128874b2..2398167f49 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -132,7 +132,6 @@ typedef enum WalRcvWakeupReason
 static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
 
 static StringInfoData reply_message;
-static StringInfoData incoming_message;
 
 /* Prototypes for private functions */
 static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
@@ -425,7 +424,6 @@ WalReceiverMain(void)
                        /* Initialize LogstreamResult and buffers for 
processing messages */
                        LogstreamResult.Write = LogstreamResult.Flush = 
GetXLogReplayRecPtr(NULL);
                        initStringInfo(&reply_message);
-                       initStringInfo(&incoming_message);
 
                        /* Initialize nap wakeup times. */
                        now = GetCurrentTimestamp();
@@ -843,19 +841,20 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size 
len, TimeLineID tli)
        TimestampTz sendTime;
        bool            replyRequested;
 
-       resetStringInfo(&incoming_message);
-
        switch (type)
        {
                case 'w':                               /* WAL records */
                        {
-                               /* copy message to StringInfo */
+                               StringInfoData incoming_message;
+
                                hdrlen = sizeof(int64) + sizeof(int64) + 
sizeof(int64);
                                if (len < hdrlen)
                                        ereport(ERROR,
                                                        
(errcode(ERRCODE_PROTOCOL_VIOLATION),
                                                         
errmsg_internal("invalid WAL message received from primary")));
-                               appendBinaryStringInfo(&incoming_message, buf, 
hdrlen);
+
+                               /* initialize a StringInfo with the given 
buffer */
+                               initReadOnlyStringInfo(&incoming_message, buf, 
hdrlen);
 
                                /* read the fields */
                                dataStart = pq_getmsgint64(&incoming_message);
@@ -870,13 +869,16 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size 
len, TimeLineID tli)
                        }
                case 'k':                               /* Keepalive */
                        {
-                               /* copy message to StringInfo */
+                               StringInfoData incoming_message;
+
                                hdrlen = sizeof(int64) + sizeof(int64) + 
sizeof(char);
                                if (len != hdrlen)
                                        ereport(ERROR,
                                                        
(errcode(ERRCODE_PROTOCOL_VIOLATION),
                                                         
errmsg_internal("invalid keepalive message received from primary")));
-                               appendBinaryStringInfo(&incoming_message, buf, 
hdrlen);
+
+                               /* initialize a StringInfo with the given 
buffer */
+                               initReadOnlyStringInfo(&incoming_message, buf, 
hdrlen);
 
                                /* read the fields */
                                walEnd = pq_getmsgint64(&incoming_message);

Reply via email to