On Thu, 26 Oct 2023 at 17:00, David Rowley <dgrowle...@gmail.com> wrote: > Thanks for looking at this again. I fixed up each of those and pushed > the result, mentioning the incompatibility in the commit message. > > Now that that's done, I've attached a patch which makes use of the new > initReadOnlyStringInfo initializer function for the original case > mentioned when I opened this thread. I don't think there are any > remaining objections to this, but I'll let it sit for a bit to see.
I've just pushed the deserial function optimisation patch. I was just looking at a few other places where we might want to make use of initReadOnlyStringInfo. * parallel.c in HandleParallelMessages(): Drilling into HandleParallelMessage(), I see the PqMsg_BackendKeyData case just reads a fixed number of bytes. In some of the other "switch" cases, I see calls pq_getmsgrawstring() either directly or indirectly. I see the counterpart to pq_getmsgrawstring() is pq_sendstring() which always appends the NUL char to the StringInfo, so I don't think not NUL terminating the received bytes is a problem as cstrings seem to be sent with the NUL terminator. This case just seems to handle ERROR/NOTICE messages coming from parallel workers. Not tuples themselves. It may not be that interesting a case to speed up. * applyparallelworker.c in HandleParallelApplyMessages(): Drilling into HandleParallelApplyMessage(), I don't see anything there that needs the input StringInfo to be NUL terminated. * worker.c in apply_spooled_messages(): Drilling into apply_dispatch() and going through each of the cases, I see logicalrep_read_tuple() pallocs a new buffer and ensures it's always NUL terminated which will be required in LOGICALREP_COLUMN_TEXT mode. (There seems to be further optimisation opportunities there where we could not do the palloc when in LOGICALREP_COLUMN_BINARY mode and just point value's buffer directly to the correct portion of the input StringInfo's buffer). * walreceiver.c in XLogWalRcvProcessMsg(): Nothing there seems to require the incoming_message StringInfo to have a NUL terminator. I imagine this one is the most worthwhile to do out of the 4. I've not tested to see if there are any performance improvements. Does anyone see any reason why we can't do the attached? David
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 194a1207be..f3708005d2 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -1087,10 +1087,8 @@ HandleParallelMessages(void) { StringInfoData msg; - initStringInfo(&msg); - appendBinaryStringInfo(&msg, data, nbytes); + initReadOnlyStringInfo(&msg, data, nbytes); HandleParallelMessage(pcxt, i, &msg); - pfree(msg.data); } else ereport(ERROR, diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 9b37736f8e..fcc33bda10 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -1117,10 +1117,8 @@ HandleParallelApplyMessages(void) { StringInfoData msg; - initStringInfo(&msg); - appendBinaryStringInfo(&msg, data, nbytes); + initReadOnlyStringInfo(&msg, data, nbytes); HandleParallelApplyMessage(&msg); - pfree(msg.data); } else ereport(ERROR, diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index ba67eb156f..52a9f136ab 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2019,7 +2019,6 @@ void apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, XLogRecPtr lsn) { - StringInfoData s2; int nchanges; char path[MAXPGPATH]; char *buffer = NULL; @@ -2057,7 +2056,6 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, CurrentResourceOwner = oldowner; buffer = palloc(BLCKSZ); - initStringInfo(&s2); MemoryContextSwitchTo(oldcxt); @@ -2079,6 +2077,7 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, nchanges = 0; while (true) { + StringInfoData s2; size_t nbytes; int len; @@ -2104,9 +2103,8 @@ apply_spooled_messages(FileSet *stream_fileset, TransactionId xid, BufFileTell(stream_fd, &fileno, &offset); - /* copy the buffer to the stringinfo and call apply_dispatch */ - resetStringInfo(&s2); - appendBinaryStringInfo(&s2, buffer, len); + /* init a stringinfo using the buffer and call apply_dispatch */ + initReadOnlyStringInfo(&s2, buffer, len); /* Ensure we are reading the data into our memory context. */ oldcxt = MemoryContextSwitchTo(ApplyMessageContext); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a3128874b2..2398167f49 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -132,7 +132,6 @@ typedef enum WalRcvWakeupReason static TimestampTz wakeup[NUM_WALRCV_WAKEUPS]; static StringInfoData reply_message; -static StringInfoData incoming_message; /* Prototypes for private functions */ static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last); @@ -425,7 +424,6 @@ WalReceiverMain(void) /* Initialize LogstreamResult and buffers for processing messages */ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL); initStringInfo(&reply_message); - initStringInfo(&incoming_message); /* Initialize nap wakeup times. */ now = GetCurrentTimestamp(); @@ -843,19 +841,20 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) TimestampTz sendTime; bool replyRequested; - resetStringInfo(&incoming_message); - switch (type) { case 'w': /* WAL records */ { - /* copy message to StringInfo */ + StringInfoData incoming_message; + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); if (len < hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid WAL message received from primary"))); - appendBinaryStringInfo(&incoming_message, buf, hdrlen); + + /* initialize a StringInfo with the given buffer */ + initReadOnlyStringInfo(&incoming_message, buf, hdrlen); /* read the fields */ dataStart = pq_getmsgint64(&incoming_message); @@ -870,13 +869,16 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli) } case 'k': /* Keepalive */ { - /* copy message to StringInfo */ + StringInfoData incoming_message; + hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); if (len != hdrlen) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary"))); - appendBinaryStringInfo(&incoming_message, buf, hdrlen); + + /* initialize a StringInfo with the given buffer */ + initReadOnlyStringInfo(&incoming_message, buf, hdrlen); /* read the fields */ walEnd = pq_getmsgint64(&incoming_message);