On 04/02/2012 01:03 PM, Tom Lane wrote:
Andrew Dunstan<and...@dunslane.net> writes:
On 04/02/2012 12:44 PM, Tom Lane wrote:
You could do something like having a list of pending chunks for each
value of (pid mod 256). The length of each such list ought to be plenty
short under ordinary circumstances.
Yeah, ok, that should work. How big would we make each list to start
with? Still 20, or smaller?
When I said "list", I meant a "List *". No fixed size.
Ok, like this? Do we consider this a bug fix, to be backpatched?
cheers
andrew
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index a603ab4..2e4d437 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -35,6 +35,7 @@
#include "libpq/pqsignal.h"
#include "miscadmin.h"
#include "pgtime.h"
+#include "nodes/pg_list.h"
#include "postmaster/fork_process.h"
#include "postmaster/postmaster.h"
#include "postmaster/syslogger.h"
@@ -93,10 +94,11 @@ static char *last_file_name = NULL;
static char *last_csv_file_name = NULL;
/*
- * Buffers for saving partial messages from different backends. We don't expect
- * that there will be very many outstanding at one time, so 20 seems plenty of
- * leeway. If this array gets full we won't lose messages, but we will lose
- * the protocol protection against them being partially written or interleaved.
+ * Buffers for saving partial messages from different backends.
+ *
+ * Keep 256 lists of these, the list for a given pid being pid mod 256,
+ * so that for any message we don't have to search a long list looking
+ * for a match and/or an open slot.
*
* An inactive buffer has pid == 0 and undefined contents of data.
*/
@@ -106,8 +108,8 @@ typedef struct
StringInfoData data; /* accumulated data, as a StringInfo */
} save_buffer;
-#define CHUNK_SLOTS 20
-static save_buffer saved_chunks[CHUNK_SLOTS];
+#define CHUNK_STRIPES 256
+static List *buffer_stripes[CHUNK_STRIPES];
/* These must be exported for EXEC_BACKEND case ... annoying */
#ifndef WIN32
@@ -725,6 +727,7 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
{
PipeProtoHeader p;
int chunklen;
+ List *buffer_stripe;
/* Do we have a valid header? */
memcpy(&p, cursor, sizeof(PipeProtoHeader));
@@ -743,53 +746,55 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
dest = (p.is_last == 'T' || p.is_last == 'F') ?
LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
+ buffer_stripe = buffer_stripes[p.pid % CHUNK_STRIPES];
+
if (p.is_last == 'f' || p.is_last == 'F')
{
/*
- * Save a complete non-final chunk in the per-pid buffer if
- * possible - if not just write it out.
+ * Save a complete non-final chunk in the per-pid buffer
*/
- int free_slot = -1,
- existing_slot = -1;
- int i;
+ ListCell *cell;
+ save_buffer *existing_slot = NULL,
+ *free_slot = NULL;
StringInfo str;
- for (i = 0; i < CHUNK_SLOTS; i++)
+ foreach (cell, buffer_stripe)
{
- if (saved_chunks[i].pid == p.pid)
+ save_buffer *buffnode = cell->data.ptr_value;
+ if (buffnode->pid == p.pid)
{
- existing_slot = i;
+ existing_slot = buffnode;
break;
}
- if (free_slot < 0 && saved_chunks[i].pid == 0)
- free_slot = i;
+ if (free_slot == NULL && buffnode->pid == 0)
+ free_slot = buffnode;
}
- if (existing_slot >= 0)
+ if (existing_slot != NULL)
{
- str = &(saved_chunks[existing_slot].data);
+ str = &(existing_slot->data);
appendBinaryStringInfo(str,
cursor + PIPE_HEADER_SIZE,
p.len);
}
- else if (free_slot >= 0)
+ else
{
- saved_chunks[free_slot].pid = p.pid;
- str = &(saved_chunks[free_slot].data);
+ if (free_slot == NULL)
+ {
+ /*
+ * Need a free slot, but there isn't one in the list,
+ * so create a new one and extend the list with it.
+ */
+ free_slot = palloc(sizeof(save_buffer));
+ buffer_stripe = lappend(buffer_stripe, free_slot);
+ buffer_stripes[p.pid % CHUNK_STRIPES] = buffer_stripe;
+ }
+ free_slot->pid = p.pid;
+ str = &(free_slot->data);
initStringInfo(str);
appendBinaryStringInfo(str,
cursor + PIPE_HEADER_SIZE,
p.len);
}
- else
- {
- /*
- * If there is no free slot we'll just have to take our
- * chances and write out a partial message and hope that
- * it's not followed by something from another pid.
- */
- write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len,
- dest);
- }
}
else
{
@@ -797,26 +802,28 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
* Final chunk --- add it to anything saved for that pid, and
* either way write the whole thing out.
*/
- int existing_slot = -1;
- int i;
+ ListCell *cell;
+ save_buffer *existing_slot = NULL;
StringInfo str;
- for (i = 0; i < CHUNK_SLOTS; i++)
+ foreach (cell, buffer_stripe)
{
- if (saved_chunks[i].pid == p.pid)
+ save_buffer *buffnode = cell->data.ptr_value;
+ if (buffnode->pid == p.pid)
{
- existing_slot = i;
+ existing_slot = buffnode;
break;
}
}
- if (existing_slot >= 0)
+
+ if (existing_slot != NULL)
{
- str = &(saved_chunks[existing_slot].data);
+ str = &(existing_slot->data);
appendBinaryStringInfo(str,
cursor + PIPE_HEADER_SIZE,
p.len);
write_syslogger_file(str->data, str->len, dest);
- saved_chunks[existing_slot].pid = 0;
+ existing_slot->pid = 0;
pfree(str->data);
}
else
@@ -871,18 +878,26 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
static void
flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
{
- int i;
+ int stripe;
+ List *list;
+ ListCell *cell;
StringInfo str;
/* Dump any incomplete protocol messages */
- for (i = 0; i < CHUNK_SLOTS; i++)
+ for (stripe = 0; stripe < CHUNK_STRIPES; stripe++)
{
- if (saved_chunks[i].pid != 0)
+ list = buffer_stripes[stripe];
+
+ foreach (cell, list)
{
- str = &(saved_chunks[i].data);
- write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR);
- saved_chunks[i].pid = 0;
- pfree(str->data);
+ save_buffer *buf = cell->data.ptr_value;
+ if (buf->pid != 0)
+ {
+ str = &(buf->data);
+ write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR);
+ buf->pid = 0;
+ pfree(str->data);
+ }
}
}
--
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers