On 04/01/2012 06:34 PM, Andrew Dunstan wrote:
Some of my PostgreSQL Experts colleagues have been complaining to me that servers under load with very large queries cause CSV log files that are corrupted, because lines are apparently multiplexed. The log chunking protocol between the errlog routines and the syslogger is supposed to prevent that, so I did a little work to try to reproduce it in a controlled way.


Well, a little further digging jogged my memory a bit. It looks like we underestimated the amount of messages we might get as more than one chunk fairly badly.

We could just increase CHUNK_SLOTS in syslogger.c, but I opted instead to stripe the slots with a two dimensional array, so we didn't have to search a larger number of slots for any given message. See the attached patch.

I'm not sure how much we want to scale this up. I set CHUNK_STRIPES to 20 to start with, and I've asked some colleagues with very heavy log loads with very large queries to test it out if possible. If anyone else has a similar load I'd appreciate similar testing.

cheers

andrew
*** a/src/backend/postmaster/syslogger.c
--- b/src/backend/postmaster/syslogger.c
***************
*** 93,101 **** static char *last_file_name = NULL;
  static char *last_csv_file_name = NULL;
  
  /*
!  * Buffers for saving partial messages from different backends. We don't expect
!  * that there will be very many outstanding at one time, so 20 seems plenty of
!  * leeway. If this array gets full we won't lose messages, but we will lose
   * the protocol protection against them being partially written or interleaved.
   *
   * An inactive buffer has pid == 0 and undefined contents of data.
--- 93,108 ----
  static char *last_csv_file_name = NULL;
  
  /*
!  * Buffers for saving partial messages from different backends.
!  *
!  * Under heavy load we can get quite a few of them, so we stripe them across
!  * an array based on the mod of the pid, which seem an adequate hash function
!  * for pids. We do this rather then just keeping a one-dimensional array so
!  * we don't have to probe too many slots for any given pid.
!  * 20 stripes of 20 slots each seems likely to be adequate leeway, but if there
!  * are lots of overruns we might need to increase CHUNK_STRIPES a bit, or look
!  * at some alternative scheme.
!  * If a stripe gets full we won't lose messages, but we will lose
   * the protocol protection against them being partially written or interleaved.
   *
   * An inactive buffer has pid == 0 and undefined contents of data.
***************
*** 107,113 **** typedef struct
  } save_buffer;
  
  #define CHUNK_SLOTS 20
! static save_buffer saved_chunks[CHUNK_SLOTS];
  
  /* These must be exported for EXEC_BACKEND case ... annoying */
  #ifndef WIN32
--- 114,121 ----
  } save_buffer;
  
  #define CHUNK_SLOTS 20
! #define CHUNK_STRIPES 20
! static save_buffer saved_chunks[CHUNK_STRIPES][CHUNK_SLOTS];
  
  /* These must be exported for EXEC_BACKEND case ... annoying */
  #ifndef WIN32
***************
*** 725,730 **** process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
--- 733,739 ----
  	{
  		PipeProtoHeader p;
  		int			chunklen;
+ 		int         stripe;
  
  		/* Do we have a valid header? */
  		memcpy(&p, cursor, sizeof(PipeProtoHeader));
***************
*** 743,748 **** process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
--- 752,759 ----
  			dest = (p.is_last == 'T' || p.is_last == 'F') ?
  				LOG_DESTINATION_CSVLOG : LOG_DESTINATION_STDERR;
  
+ 			stripe = p.pid % CHUNK_STRIPES;
+ 
  			if (p.is_last == 'f' || p.is_last == 'F')
  			{
  				/*
***************
*** 756,780 **** process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
  
  				for (i = 0; i < CHUNK_SLOTS; i++)
  				{
! 					if (saved_chunks[i].pid == p.pid)
  					{
  						existing_slot = i;
  						break;
  					}
! 					if (free_slot < 0 && saved_chunks[i].pid == 0)
  						free_slot = i;
  				}
  				if (existing_slot >= 0)
  				{
! 					str = &(saved_chunks[existing_slot].data);
  					appendBinaryStringInfo(str,
  										   cursor + PIPE_HEADER_SIZE,
  										   p.len);
  				}
  				else if (free_slot >= 0)
  				{
! 					saved_chunks[free_slot].pid = p.pid;
! 					str = &(saved_chunks[free_slot].data);
  					initStringInfo(str);
  					appendBinaryStringInfo(str,
  										   cursor + PIPE_HEADER_SIZE,
--- 767,791 ----
  
  				for (i = 0; i < CHUNK_SLOTS; i++)
  				{
! 					if (saved_chunks[stripe][i].pid == p.pid)
  					{
  						existing_slot = i;
  						break;
  					}
! 					if (free_slot < 0 && saved_chunks[stripe][i].pid == 0)
  						free_slot = i;
  				}
  				if (existing_slot >= 0)
  				{
! 					str = &(saved_chunks[stripe][existing_slot].data);
  					appendBinaryStringInfo(str,
  										   cursor + PIPE_HEADER_SIZE,
  										   p.len);
  				}
  				else if (free_slot >= 0)
  				{
! 					saved_chunks[stripe][free_slot].pid = p.pid;
! 					str = &(saved_chunks[stripe][free_slot].data);
  					initStringInfo(str);
  					appendBinaryStringInfo(str,
  										   cursor + PIPE_HEADER_SIZE,
***************
*** 803,809 **** process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
  
  				for (i = 0; i < CHUNK_SLOTS; i++)
  				{
! 					if (saved_chunks[i].pid == p.pid)
  					{
  						existing_slot = i;
  						break;
--- 814,820 ----
  
  				for (i = 0; i < CHUNK_SLOTS; i++)
  				{
! 					if (saved_chunks[stripe][i].pid == p.pid)
  					{
  						existing_slot = i;
  						break;
***************
*** 811,822 **** process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
  				}
  				if (existing_slot >= 0)
  				{
! 					str = &(saved_chunks[existing_slot].data);
  					appendBinaryStringInfo(str,
  										   cursor + PIPE_HEADER_SIZE,
  										   p.len);
  					write_syslogger_file(str->data, str->len, dest);
! 					saved_chunks[existing_slot].pid = 0;
  					pfree(str->data);
  				}
  				else
--- 822,833 ----
  				}
  				if (existing_slot >= 0)
  				{
! 					str = &(saved_chunks[stripe][existing_slot].data);
  					appendBinaryStringInfo(str,
  										   cursor + PIPE_HEADER_SIZE,
  										   p.len);
  					write_syslogger_file(str->data, str->len, dest);
! 					saved_chunks[stripe][existing_slot].pid = 0;
  					pfree(str->data);
  				}
  				else
***************
*** 871,890 **** process_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
  static void
  flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
  {
! 	int			i;
  	StringInfo	str;
  
  	/* Dump any incomplete protocol messages */
! 	for (i = 0; i < CHUNK_SLOTS; i++)
! 	{
! 		if (saved_chunks[i].pid != 0)
  		{
! 			str = &(saved_chunks[i].data);
! 			write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR);
! 			saved_chunks[i].pid = 0;
! 			pfree(str->data);
  		}
- 	}
  
  	/*
  	 * Force out any remaining pipe data as-is; we don't bother trying to
--- 882,902 ----
  static void
  flush_pipe_input(char *logbuffer, int *bytes_in_logbuffer)
  {
! 	int			i, stripe;
  	StringInfo	str;
  
  	/* Dump any incomplete protocol messages */
! 	for (stripe = 0; stripe < CHUNK_STRIPES; stripe++)
! 		for (i = 0; i < CHUNK_SLOTS; i++)
  		{
! 			if (saved_chunks[stripe][i].pid != 0)
! 			{
! 				str = &(saved_chunks[stripe][i].data);
! 				write_syslogger_file(str->data, str->len, LOG_DESTINATION_STDERR);
! 				saved_chunks[stripe][i].pid = 0;
! 				pfree(str->data);
! 			}
  		}
  
  	/*
  	 * Force out any remaining pipe data as-is; we don't bother trying to
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to