On 04/02/2012 01:03 PM, Tom Lane wrote:
Andrew Dunstan<and...@dunslane.net>  writes:
On 04/02/2012 12:44 PM, Tom Lane wrote:
You could do something like having a list of pending chunks for each
value of (pid mod 256).  The length of each such list ought to be plenty
short under ordinary circumstances.
Yeah, ok, that should work. How big would we make each list to start
with? Still 20, or smaller?
When I said "list", I meant a "List *".  No fixed size.

                        

Ok, like this? Do we consider this a bug fix, to be backpatched?

cheers

andrew


diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index a603ab4..2e4d437 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -35,6 +35,7 @@
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "pgtime.h"
+#include "nodes/pg_list.h"
 #include "postmaster/fork_process.h"
 #include "postmaster/postmaster.h"
 #include "postmaster/syslogger.h"
@@ -93,10 +94,11 @@ static char *last_file_name = NULL;
 static char *last_csv_file_name = NULL;
 
 /*
- * Buffers for saving partial messages from different backends. We don't expect
- * that there will be very many outstanding at one time, so 20 seems plenty of
- * leeway. If this array gets full we won't lose messages, but we will lose
- * the protocol protection against them being partially written or interleaved.
+ * Buffers for saving partial messages from different backends.
+ *
+ * Keep 256 lists of these, the list for a given pid being pid mod 256,
+ * so that for any message we don't have to search a long list looking
+ * for a match and/or an open slot.
  *
  * An inactive buffer has pid == 0 and undefined contents of data.
  */
@@ -106,8 +108,8 @@ typedef struct
 	StringInfoData data;		/* accumulated data, as a StringInfo */
 } save_buffer;
 
-#define CHUNK_SLOTS 20
-static save_buffer saved_chunks[CHUNK_SLOTS];
+#define CHUNK_STRIPES 256
+static List *buffer_stripes[CHUNK_STRIPES];
 
 /* These must be exported for EXEC_BACKEND case ... annoying */
 #ifndef WIN32
@@ -725,6 +727,7 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
 	{
 		PipeProtoHeader p;
 		int			chunklen;
+		List         *buffer_stripe;
 
 		/* Do we have a valid header? */
 		memcpy(&p, cursor, sizeof(PipeProtoHeader));
@@ -743,53 +746,55 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
 			dest = (p.is_last == 'T' || p.is_last == 'F') ?
 				LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
 
+			buffer_stripe = buffer_stripes[p.pid % CHUNK_STRIPES];
+
 			if (p.is_last == 'f' || p.is_last == 'F')
 			{
 				/*
-				 * Save a complete non-final chunk in the per-pid buffer if
-				 * possible - if not just write it out.
+				 * Save a complete non-final chunk in the per-pid buffer
 				 */
-				int			free_slot = -1,
-							existing_slot = -1;
-				int			i;
+				ListCell    *cell;
+				save_buffer *existing_slot = NULL, 
+					*free_slot = NULL;
 				StringInfo	str;
 
-				for (i = 0; i < CHUNK_SLOTS; i++)
+				foreach (cell, buffer_stripe)
 				{
-					if (saved_chunks[i].pid == p.pid)
+					save_buffer *buffnode = cell->data.ptr_value;
+					if (buffnode->pid == p.pid)
 					{
-						existing_slot = i;
+						existing_slot = buffnode;
 						break;
 					}
-					if (free_slot < 0 && saved_chunks[i].pid == 0)
-						free_slot = i;
+					if (free_slot == NULL && buffnode->pid == 0)
+						free_slot = buffnode;
 				}
-				if (existing_slot >= 0)
+				if (existing_slot != NULL)
 				{
-					str = &(saved_chunks[existing_slot].data);
+					str = &(existing_slot->data);
 					appendBinaryStringInfo(str,
 										   cursor + PIPE_HEADER_SIZE,
 										   p.len);
 				}
-				else if (free_slot >= 0)
+				else 
 				{
-					saved_chunks[free_slot].pid = p.pid;
-					str = &(saved_chunks[free_slot].data);
+					if (free_slot == NULL)
+					{
+						/* 
+						 * Need a free slot, but there isn't one in the list,
+						 * so create a new one and extend the list with it.
+						 */
+						free_slot = palloc(sizeof(save_buffer));
+						buffer_stripe = lappend(buffer_stripe, free_slot);
+						buffer_stripes[p.pid % CHUNK_STRIPES] = buffer_stripe;
+					}
+					free_slot->pid = p.pid;
+					str = &(free_slot->data);
 					initStringInfo(str);
 					appendBinaryStringInfo(str,
 										   cursor + PIPE_HEADER_SIZE,
 										   p.len);
 				}
-				else
-				{
-					/*
-					 * If there is no free slot we'll just have to take our
-					 * chances and write out a partial message and hope that
-					 * it's not followed by something from another pid.
-					 */
-					write_syslogger_file(cursor + PIPE_HEADER_SIZE, p.len,
-										 dest);
-				}
 			}
 			else
 			{
@@ -797,26 +802,28 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
 				 * Final chunk --- add it to anything saved for that pid, and
 				 * either way write the whole thing out.
 				 */
-				int			existing_slot = -1;
-				int			i;
+				ListCell     *cell;
+				save_buffer *existing_slot = NULL;
 				StringInfo	str;
 
-				for (i = 0; i < CHUNK_SLOTS; i++)
+				foreach (cell, buffer_stripe)
 				{
-					if (saved_chunks[i].pid == p.pid)
+					save_buffer  *buffnode = cell->data.ptr_value;
+					if (buffnode->pid == p.pid)
 					{
-						existing_slot = i;
+						existing_slot = buffnode;
 						break;
 					}
 				}
-				if (existing_slot >= 0)
+
+				if (existing_slot != NULL)
 				{
-					str = &(saved_chunks[existing_slot].data);
+					str = &(existing_slot->data);
 					appendBinaryStringInfo(str,
 										   cursor + PIPE_HEADER_SIZE,
 										   p.len);
 					write_syslogger_file(str->data, str->len, dest);
-					saved_chunks[existing_slot].pid = 0;
+					existing_slot->pid = 0;
 					pfree(str->data);
 				}
 				else
@@ -871,18 +878,26 @@ process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
 static void
 flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
 {
-	int			i;
+	int			stripe;
+	List       *list;
+	ListCell   *cell;
 	StringInfo	str;
 
 	/* Dump any incomplete protocol messages */
-	for (i = 0; i < CHUNK_SLOTS; i++)
+	for (stripe = 0; stripe < CHUNK_STRIPES; stripe++)
 	{
-		if (saved_chunks[i].pid != 0)
+		list = buffer_stripes[stripe];
+
+		foreach (cell, list)
 		{
-			str = &(saved_chunks[i].data);
-			write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR);
-			saved_chunks[i].pid = 0;
-			pfree(str->data);
+			save_buffer *buf = cell->data.ptr_value;
+			if (buf->pid != 0)
+			{
+				str = &(buf->data);
+				write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR);
+				buf->pid = 0;
+				pfree(str->data);
+			}
 		}
 	}
 
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to