On 03.10.2012 18:15, Amit Kapila wrote:
35.WalSenderMain(void)
{
..
+                if (walsender_shutdown_requested)
+                        ereport(FATAL,
+                                        (errcode(ERRCODE_ADMIN_SHUTDOWN),
+                                         errmsg("terminating replication
connection due to administrator command")));
+
+                /* Tell the client that we are ready to receive commands */

+                ReadyForQuery(DestRemote);
+
..

+                if (walsender_shutdown_requested)
+                        ereport(FATAL,
+                                        (errcode(ERRCODE_ADMIN_SHUTDOWN),
+                                         errmsg("terminating replication
connection due to administrator command")));
+

is it necessary to check walsender_shutdown_requested 2 times in a
loop, if yes, then can we write comment why it is important to check
it again.

The idea was to check for shutdown request before and after the pq_getbyte() call, because that can block for a long time.

Looking closer, we don't currently (ie. without this patch) make any effort to react to SIGTERM in a walsender, while it's waiting for a command from the client. After starting replication, it does check walsender_shutdown_requested in the loop, and it's also checked during a base backup (although only when switching to send next file, which seems too seldom). This issue is orthogonal to handling timeline changes over streaming replication, although that patch will make it more important to handle SIGTERM quickly while waiting for a command, because you stay in that mode for longer and more often.

I think walsender needs to share more infrastructure with regular backends to handle this better. When we first implemented streaming replication in 9.0, it made sense to implement just the bare minimum needed to accept the handshake commands before entering the Copy state, but now that the replication command set has grown to cover base backups, and fetching timelines with the patch being discussed, we should bite the bullet and make the command loop more feature-complete and robust.

In a regular backend, the command loop reacts to SIGTERM immediately, setting ImmediateInterruptOK at the right places, and calling CHECK_FOR_INTERRUPTS() at strategic places. I propose that we let PostgresMain handle the main command loop for walsender processes too, like it does for regular backends, and use ProcDiePending and the regular die() signal handler for SIGTERM in walsender as well.

So I propose the attached patch. I made small changes to postgres.c to make it call exec_replication_command() instead of exec_simple_query(), and reject extend query protocol, in a WAL sender process. A lot of code related to handling the main command loop and signals is removed from walsender.c.

- Heikki
*** a/src/backend/replication/basebackup.c
--- b/src/backend/replication/basebackup.c
***************
*** 22,27 ****
--- 22,28 ----
  #include "lib/stringinfo.h"
  #include "libpq/libpq.h"
  #include "libpq/pqformat.h"
+ #include "miscadmin.h"
  #include "nodes/pg_list.h"
  #include "replication/basebackup.h"
  #include "replication/walsender.h"
***************
*** 30,36 ****
  #include "storage/ipc.h"
  #include "utils/builtins.h"
  #include "utils/elog.h"
- #include "utils/memutils.h"
  #include "utils/ps_status.h"
  
  typedef struct
--- 31,36 ----
***************
*** 370,388 **** void
  SendBaseBackup(BaseBackupCmd *cmd)
  {
  	DIR		   *dir;
- 	MemoryContext backup_context;
- 	MemoryContext old_context;
  	basebackup_options opt;
  
  	parse_basebackup_options(cmd->options, &opt);
  
- 	backup_context = AllocSetContextCreate(CurrentMemoryContext,
- 										   "Streaming base backup context",
- 										   ALLOCSET_DEFAULT_MINSIZE,
- 										   ALLOCSET_DEFAULT_INITSIZE,
- 										   ALLOCSET_DEFAULT_MAXSIZE);
- 	old_context = MemoryContextSwitchTo(backup_context);
- 
  	WalSndSetState(WALSNDSTATE_BACKUP);
  
  	if (update_process_title)
--- 370,379 ----
***************
*** 403,411 **** SendBaseBackup(BaseBackupCmd *cmd)
  	perform_base_backup(&opt, dir);
  
  	FreeDir(dir);
- 
- 	MemoryContextSwitchTo(old_context);
- 	MemoryContextDelete(backup_context);
  }
  
  static void
--- 394,399 ----
***************
*** 606,612 **** sendDir(char *path, int basepathlen, bool sizeonly)
  		 * error in that case. The error handler further up will call
  		 * do_pg_abort_backup() for us.
  		 */
! 		if (walsender_shutdown_requested || walsender_ready_to_stop)
  			ereport(ERROR,
  				(errmsg("shutdown requested, aborting active base backup")));
  
--- 594,600 ----
  		 * error in that case. The error handler further up will call
  		 * do_pg_abort_backup() for us.
  		 */
! 		if (ProcDiePending || walsender_ready_to_stop)
  			ereport(ERROR,
  				(errmsg("shutdown requested, aborting active base backup")));
  
*** a/src/backend/replication/walsender.c
--- b/src/backend/replication/walsender.c
***************
*** 113,133 **** static TimestampTz last_reply_timestamp;
  
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
- volatile sig_atomic_t walsender_shutdown_requested = false;
  volatile sig_atomic_t walsender_ready_to_stop = false;
  
  /* Signal handlers */
  static void WalSndSigHupHandler(SIGNAL_ARGS);
- static void WalSndShutdownHandler(SIGNAL_ARGS);
- static void WalSndQuickDieHandler(SIGNAL_ARGS);
  static void WalSndXLogSendHandler(SIGNAL_ARGS);
  static void WalSndLastCycleHandler(SIGNAL_ARGS);
  
  /* Prototypes for private functions */
- static bool HandleReplicationCommand(const char *cmd_string);
  static void WalSndLoop(void) __attribute__((noreturn));
! static void InitWalSnd(void);
! static void WalSndHandshake(void);
  static void WalSndKill(int code, Datum arg);
  static void XLogSend(char *msgbuf, bool *caughtup);
  static void IdentifySystem(void);
--- 113,128 ----
  
  /* Flags set by signal handlers for later service in main loop */
  static volatile sig_atomic_t got_SIGHUP = false;
  volatile sig_atomic_t walsender_ready_to_stop = false;
  
  /* Signal handlers */
  static void WalSndSigHupHandler(SIGNAL_ARGS);
  static void WalSndXLogSendHandler(SIGNAL_ARGS);
  static void WalSndLastCycleHandler(SIGNAL_ARGS);
  
  /* Prototypes for private functions */
  static void WalSndLoop(void) __attribute__((noreturn));
! static void InitWalSenderSlot(void);
  static void WalSndKill(int code, Datum arg);
  static void XLogSend(char *msgbuf, bool *caughtup);
  static void IdentifySystem(void);
***************
*** 139,284 **** static void ProcessRepliesIfAny(void);
  static void WalSndKeepalive(char *msgbuf);
  
  
! /* Main entry point for walsender process */
  void
! WalSenderMain(void)
  {
- 	MemoryContext walsnd_context;
- 
  	am_cascading_walsender = RecoveryInProgress();
  
  	/* Create a per-walsender data structure in shared memory */
! 	InitWalSnd();
! 
! 	/*
! 	 * Create a memory context that we will do all our work in.  We do this so
! 	 * that we can reset the context during error recovery and thereby avoid
! 	 * possible memory leaks.  Formerly this code just ran in
! 	 * TopMemoryContext, but resetting that would be a really bad idea.
! 	 *
! 	 * XXX: we don't actually attempt error recovery in walsender, we just
! 	 * close the connection and exit.
! 	 */
! 	walsnd_context = AllocSetContextCreate(TopMemoryContext,
! 										   "Wal Sender",
! 										   ALLOCSET_DEFAULT_MINSIZE,
! 										   ALLOCSET_DEFAULT_INITSIZE,
! 										   ALLOCSET_DEFAULT_MAXSIZE);
! 	MemoryContextSwitchTo(walsnd_context);
  
  	/* Set up resource owner */
  	CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
  
- 	/* Unblock signals (they were blocked when the postmaster forked us) */
- 	PG_SETMASK(&UnBlockSig);
- 
  	/*
  	 * Use the recovery target timeline ID during recovery
  	 */
  	if (am_cascading_walsender)
  		ThisTimeLineID = GetRecoveryTargetTLI();
- 
- 	/* Tell the standby that walsender is ready for receiving commands */
- 	ReadyForQuery(DestRemote);
- 
- 	/* Handle handshake messages before streaming */
- 	WalSndHandshake();
- 
- 	/* Initialize shared memory status */
- 	{
- 		/* use volatile pointer to prevent code rearrangement */
- 		volatile WalSnd *walsnd = MyWalSnd;
- 
- 		SpinLockAcquire(&walsnd->mutex);
- 		walsnd->sentPtr = sentPtr;
- 		SpinLockRelease(&walsnd->mutex);
- 	}
- 
- 	SyncRepInitConfig();
- 
- 	/* Main loop of walsender */
- 	WalSndLoop();
  }
  
  /*
!  * Execute commands from walreceiver, until we enter streaming mode.
   */
! static void
! WalSndHandshake(void)
  {
! 	StringInfoData input_message;
! 	bool		replication_started = false;
! 
! 	initStringInfo(&input_message);
! 
! 	while (!replication_started)
  	{
! 		int			firstchar;
! 
! 		WalSndSetState(WALSNDSTATE_STARTUP);
! 		set_ps_display("idle", false);
! 
! 		/* Wait for a command to arrive */
! 		firstchar = pq_getbyte();
! 
! 		/*
! 		 * Emergency bailout if postmaster has died.  This is to avoid the
! 		 * necessity for manual cleanup of all postmaster children.
! 		 */
! 		if (!PostmasterIsAlive())
! 			exit(1);
! 
! 		/*
! 		 * Check for any other interesting events that happened while we
! 		 * slept.
! 		 */
! 		if (got_SIGHUP)
! 		{
! 			got_SIGHUP = false;
! 			ProcessConfigFile(PGC_SIGHUP);
! 		}
! 
! 		if (firstchar != EOF)
! 		{
! 			/*
! 			 * Read the message contents. This is expected to be done without
! 			 * blocking because we've been able to get message type code.
! 			 */
! 			if (pq_getmessage(&input_message, 0))
! 				firstchar = EOF;	/* suitable message already logged */
! 		}
! 
! 		/* Handle the very limited subset of commands expected in this phase */
! 		switch (firstchar)
! 		{
! 			case 'Q':			/* Query message */
! 				{
! 					const char *query_string;
! 
! 					query_string = pq_getmsgstring(&input_message);
! 					pq_getmsgend(&input_message);
! 
! 					if (HandleReplicationCommand(query_string))
! 						replication_started = true;
! 				}
! 				break;
! 
! 			case 'X':
! 				/* standby is closing the connection */
! 				proc_exit(0);
! 
! 			case EOF:
! 				/* standby disconnected unexpectedly */
! 				ereport(COMMERROR,
! 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
! 						 errmsg("unexpected EOF on standby connection")));
! 				proc_exit(0);
! 
! 			default:
! 				ereport(FATAL,
! 						(errcode(ERRCODE_PROTOCOL_VIOLATION),
! 						 errmsg("invalid standby handshake message type %d", firstchar)));
! 		}
  	}
  }
  
--- 134,172 ----
  static void WalSndKeepalive(char *msgbuf);
  
  
! /* Initialize walsender process before entering the main command loop */
  void
! InitWalSender(void)
  {
  	am_cascading_walsender = RecoveryInProgress();
  
  	/* Create a per-walsender data structure in shared memory */
! 	InitWalSenderSlot();
  
  	/* Set up resource owner */
  	CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner");
  
  	/*
  	 * Use the recovery target timeline ID during recovery
  	 */
  	if (am_cascading_walsender)
  		ThisTimeLineID = GetRecoveryTargetTLI();
  }
  
  /*
!  * Clean up after an error.
!  *
!  * WAL sender processes don't use transactions like regular backends do.
!  * This should do any cleanup required in a WAL sender process, similar to
!  * what transaction abort does in a regular backend.
   */
! void
! WalSndErrorCleanup()
  {
! 	if (sendFile >= 0)
  	{
! 		close(sendFile);
! 		sendFile = -1;
  	}
  }
  
***************
*** 350,364 **** IdentifySystem(void)
  	pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
  
  	pq_endmessage(&buf);
- 
- 	/* Send CommandComplete and ReadyForQuery messages */
- 	EndCommand("SELECT", DestRemote);
- 	ReadyForQuery(DestRemote);
- 	/* ReadyForQuery did pq_flush for us */
  }
  
  /*
!  * START_REPLICATION
   */
  static void
  StartReplication(StartReplicationCmd *cmd)
--- 238,250 ----
  	pq_sendbytes(&buf, (char *) xpos, strlen(xpos));
  
  	pq_endmessage(&buf);
  }
  
  /*
!  * Handle START_REPLICATION command.
!  *
!  * At the moment, this never returns, but an ereport(ERROR) will take us back
!  * to the main loop.
   */
  static void
  StartReplication(StartReplicationCmd *cmd)
***************
*** 435,449 **** StartReplication(StartReplicationCmd *cmd)
  	 * be shipped from that position
  	 */
  	sentPtr = cmd->startpoint;
  }
  
  /*
   * Execute an incoming replication command.
   */
! static bool
! HandleReplicationCommand(const char *cmd_string)
  {
- 	bool		replication_started = false;
  	int			parse_rc;
  	Node	   *cmd_node;
  	MemoryContext cmd_context;
--- 321,349 ----
  	 * be shipped from that position
  	 */
  	sentPtr = cmd->startpoint;
+ 
+ 	/* Also update the start position status in shared memory */
+ 	{
+ 		/* use volatile pointer to prevent code rearrangement */
+ 		volatile WalSnd *walsnd = MyWalSnd;
+ 
+ 		SpinLockAcquire(&walsnd->mutex);
+ 		walsnd->sentPtr = sentPtr;
+ 		SpinLockRelease(&walsnd->mutex);
+ 	}
+ 
+ 	SyncRepInitConfig();
+ 
+ 	/* Main loop of walsender */
+ 	WalSndLoop();
  }
  
  /*
   * Execute an incoming replication command.
   */
! void
! exec_replication_command(const char *cmd_string)
  {
  	int			parse_rc;
  	Node	   *cmd_node;
  	MemoryContext cmd_context;
***************
*** 451,456 **** HandleReplicationCommand(const char *cmd_string)
--- 351,358 ----
  
  	elog(DEBUG1, "received replication command: %s", cmd_string);
  
+ 	CHECK_FOR_INTERRUPTS();
+ 
  	cmd_context = AllocSetContextCreate(CurrentMemoryContext,
  										"Replication command context",
  										ALLOCSET_DEFAULT_MINSIZE,
***************
*** 476,493 **** HandleReplicationCommand(const char *cmd_string)
  
  		case T_StartReplicationCmd:
  			StartReplication((StartReplicationCmd *) cmd_node);
- 
- 			/* break out of the loop */
- 			replication_started = true;
  			break;
  
  		case T_BaseBackupCmd:
  			SendBaseBackup((BaseBackupCmd *) cmd_node);
- 
- 			/* Send CommandComplete and ReadyForQuery messages */
- 			EndCommand("SELECT", DestRemote);
- 			ReadyForQuery(DestRemote);
- 			/* ReadyForQuery did pq_flush for us */
  			break;
  
  		default:
--- 378,387 ----
***************
*** 500,506 **** HandleReplicationCommand(const char *cmd_string)
  	MemoryContextSwitchTo(old_context);
  	MemoryContextDelete(cmd_context);
  
! 	return replication_started;
  }
  
  /*
--- 394,401 ----
  	MemoryContextSwitchTo(old_context);
  	MemoryContextDelete(cmd_context);
  
! 	/* Send CommandComplete message */
! 	EndCommand("SELECT", DestRemote);
  }
  
  /*
***************
*** 754,768 **** WalSndLoop(void)
  			SyncRepInitConfig();
  		}
  
! 		/* Normal exit from the walsender is here */
! 		if (walsender_shutdown_requested)
! 		{
! 			/* Inform the standby that XLOG streaming is done */
! 			pq_puttextmessage('C', "COPY 0");
! 			pq_flush();
! 
! 			proc_exit(0);
! 		}
  
  		/* Check for input from the client */
  		ProcessRepliesIfAny();
--- 649,655 ----
  			SyncRepInitConfig();
  		}
  
! 		CHECK_FOR_INTERRUPTS();
  
  		/* Check for input from the client */
  		ProcessRepliesIfAny();
***************
*** 813,819 **** WalSndLoop(void)
  				XLogSend(output_message, &caughtup);
  				if (caughtup && !pq_is_send_pending())
  				{
! 					walsender_shutdown_requested = true;
  					continue;	/* don't want to wait more */
  				}
  			}
--- 700,706 ----
  				XLogSend(output_message, &caughtup);
  				if (caughtup && !pq_is_send_pending())
  				{
! 					ProcDiePending = true;
  					continue;	/* don't want to wait more */
  				}
  			}
***************
*** 854,861 **** WalSndLoop(void)
--- 741,751 ----
  			}
  
  			/* Sleep until something happens or replication timeout */
+ 			ImmediateInterruptOK = true;
+ 			CHECK_FOR_INTERRUPTS();
  			WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents,
  							  MyProcPort->sock, sleeptime);
+ 			ImmediateInterruptOK = false;
  
  			/*
  			 * Check for replication timeout.  Note we ignore the corner case
***************
*** 892,898 **** WalSndLoop(void)
  
  /* Initialize a per-walsender data structure for this walsender process */
  static void
! InitWalSnd(void)
  {
  	int			i;
  
--- 782,788 ----
  
  /* Initialize a per-walsender data structure for this walsender process */
  static void
! InitWalSenderSlot(void)
  {
  	int			i;
  
***************
*** 1284,1341 **** WalSndSigHupHandler(SIGNAL_ARGS)
  	errno = save_errno;
  }
  
- /* SIGTERM: set flag to shut down */
- static void
- WalSndShutdownHandler(SIGNAL_ARGS)
- {
- 	int			save_errno = errno;
- 
- 	walsender_shutdown_requested = true;
- 	if (MyWalSnd)
- 		SetLatch(&MyWalSnd->latch);
- 
- 	/*
- 	 * Set the standard (non-walsender) state as well, so that we can abort
- 	 * things like do_pg_stop_backup().
- 	 */
- 	InterruptPending = true;
- 	ProcDiePending = true;
- 
- 	errno = save_errno;
- }
- 
- /*
-  * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster.
-  *
-  * Some backend has bought the farm,
-  * so we need to stop what we're doing and exit.
-  */
- static void
- WalSndQuickDieHandler(SIGNAL_ARGS)
- {
- 	PG_SETMASK(&BlockSig);
- 
- 	/*
- 	 * We DO NOT want to run proc_exit() callbacks -- we're here because
- 	 * shared memory may be corrupted, so we don't want to try to clean up our
- 	 * transaction.  Just nail the windows shut and get out of town.  Now that
- 	 * there's an atexit callback to prevent third-party code from breaking
- 	 * things by calling exit() directly, we have to reset the callbacks
- 	 * explicitly to make this work as intended.
- 	 */
- 	on_exit_reset();
- 
- 	/*
- 	 * Note we do exit(2) not exit(0).	This is to force the postmaster into a
- 	 * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random
- 	 * backend.  This is necessary precisely because we don't clean up our
- 	 * shared memory state.  (The "dead man switch" mechanism in pmsignal.c
- 	 * should ensure the postmaster sees this as a crash, too, but no harm in
- 	 * being doubly sure.)
- 	 */
- 	exit(2);
- }
- 
  /* SIGUSR1: set flag to send WAL records */
  static void
  WalSndXLogSendHandler(SIGNAL_ARGS)
--- 1174,1179 ----
***************
*** 1368,1375 **** WalSndSignals(void)
  	pqsignal(SIGHUP, WalSndSigHupHandler);		/* set flag to read config
  												 * file */
  	pqsignal(SIGINT, SIG_IGN);	/* not used */
! 	pqsignal(SIGTERM, WalSndShutdownHandler);	/* request shutdown */
! 	pqsignal(SIGQUIT, WalSndQuickDieHandler);	/* hard crash time */
  	InitializeTimeouts();		/* establishes SIGALRM handler */
  	pqsignal(SIGPIPE, SIG_IGN);
  	pqsignal(SIGUSR1, WalSndXLogSendHandler);	/* request WAL sending */
--- 1206,1213 ----
  	pqsignal(SIGHUP, WalSndSigHupHandler);		/* set flag to read config
  												 * file */
  	pqsignal(SIGINT, SIG_IGN);	/* not used */
! 	pqsignal(SIGTERM, die);						/* request shutdown */
! 	pqsignal(SIGQUIT, quickdie);				/* hard crash time */
  	InitializeTimeouts();		/* establishes SIGALRM handler */
  	pqsignal(SIGPIPE, SIG_IGN);
  	pqsignal(SIGUSR1, WalSndXLogSendHandler);	/* request WAL sending */
*** a/src/backend/tcop/postgres.c
--- b/src/backend/tcop/postgres.c
***************
*** 192,197 **** static int	InteractiveBackend(StringInfo inBuf);
--- 192,198 ----
  static int	interactive_getc(void);
  static int	SocketBackend(StringInfo inBuf);
  static int	ReadCommand(StringInfo inBuf);
+ static void forbidden_in_wal_sender(char firstchar);
  static List *pg_rewrite_query(Query *query);
  static bool check_log_statement(List *stmt_list);
  static int	errdetail_execute(List *raw_parsetree_list);
***************
*** 3720,3731 **** PostgresMain(int argc, char *argv[], const char *username)
  	if (IsUnderPostmaster && Log_disconnections)
  		on_proc_exit(log_disconnections, 0);
  
! 	/* If this is a WAL sender process, we're done with initialization. */
  	if (am_walsender)
! 	{
! 		WalSenderMain();		/* does not return */
! 		abort();
! 	}
  
  	/*
  	 * process any libraries that should be preloaded at backend start (this
--- 3721,3729 ----
  	if (IsUnderPostmaster && Log_disconnections)
  		on_proc_exit(log_disconnections, 0);
  
! 	/* Perform initialization specific to a WAL sender process. */
  	if (am_walsender)
! 		InitWalSender();
  
  	/*
  	 * process any libraries that should be preloaded at backend start (this
***************
*** 3835,3840 **** PostgresMain(int argc, char *argv[], const char *username)
--- 3833,3841 ----
  		 */
  		AbortCurrentTransaction();
  
+ 		if (am_walsender)
+ 			WalSndErrorCleanup();
+ 
  		/*
  		 * Now return to normal top-level context and clear ErrorContext for
  		 * next time.
***************
*** 3969,3975 **** PostgresMain(int argc, char *argv[], const char *username)
  					query_string = pq_getmsgstring(&input_message);
  					pq_getmsgend(&input_message);
  
! 					exec_simple_query(query_string);
  
  					send_ready_for_query = true;
  				}
--- 3970,3979 ----
  					query_string = pq_getmsgstring(&input_message);
  					pq_getmsgend(&input_message);
  
! 					if (am_walsender)
! 						exec_replication_command(query_string);
! 					else
! 						exec_simple_query(query_string);
  
  					send_ready_for_query = true;
  				}
***************
*** 3982,3987 **** PostgresMain(int argc, char *argv[], const char *username)
--- 3986,3993 ----
  					int			numParams;
  					Oid		   *paramTypes = NULL;
  
+ 					forbidden_in_wal_sender(firstchar);
+ 
  					/* Set statement_timestamp() */
  					SetCurrentStatementStartTimestamp();
  
***************
*** 4004,4009 **** PostgresMain(int argc, char *argv[], const char *username)
--- 4010,4017 ----
  				break;
  
  			case 'B':			/* bind */
+ 				forbidden_in_wal_sender(firstchar);
+ 
  				/* Set statement_timestamp() */
  				SetCurrentStatementStartTimestamp();
  
***************
*** 4019,4024 **** PostgresMain(int argc, char *argv[], const char *username)
--- 4027,4034 ----
  					const char *portal_name;
  					int			max_rows;
  
+ 					forbidden_in_wal_sender(firstchar);
+ 
  					/* Set statement_timestamp() */
  					SetCurrentStatementStartTimestamp();
  
***************
*** 4031,4036 **** PostgresMain(int argc, char *argv[], const char *username)
--- 4041,4048 ----
  				break;
  
  			case 'F':			/* fastpath function call */
+ 				forbidden_in_wal_sender(firstchar);
+ 
  				/* Set statement_timestamp() */
  				SetCurrentStatementStartTimestamp();
  
***************
*** 4078,4083 **** PostgresMain(int argc, char *argv[], const char *username)
--- 4090,4097 ----
  					int			close_type;
  					const char *close_target;
  
+ 					forbidden_in_wal_sender(firstchar);
+ 
  					close_type = pq_getmsgbyte(&input_message);
  					close_target = pq_getmsgstring(&input_message);
  					pq_getmsgend(&input_message);
***************
*** 4120,4125 **** PostgresMain(int argc, char *argv[], const char *username)
--- 4134,4141 ----
  					int			describe_type;
  					const char *describe_target;
  
+ 					forbidden_in_wal_sender(firstchar);
+ 
  					/* Set statement_timestamp() (needed for xact) */
  					SetCurrentStatementStartTimestamp();
  
***************
*** 4201,4206 **** PostgresMain(int argc, char *argv[], const char *username)
--- 4217,4245 ----
  	}							/* end of input-reading loop */
  }
  
+ /*
+  * Throw an error if we're a WAL sender process.
+  *
+  * This is used to forbid anything else than simple query protocol messages
+  * in a WAL sender process. 'firstchar' specifies what kind of a forbidden
+  * message was received, and is used to construct the error message.
+  */
+ static void
+ forbidden_in_wal_sender(char firstchar)
+ {
+ 	if (am_walsender)
+ 	{
+ 		if (firstchar == 'F')
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+ 					 errmsg("fastpath function calls not supported in a replication connection")));
+ 		else
+ 			ereport(ERROR,
+ 					(errcode(ERRCODE_PROTOCOL_VIOLATION),
+ 					 errmsg("extended query protocol not supported in a replication connection")));
+ 	}
+ }
+ 
  
  /*
   * Obtain platform stack depth limit (in bytes)
*** a/src/include/replication/walsender.h
--- b/src/include/replication/walsender.h
***************
*** 19,25 ****
  /* global state */
  extern bool am_walsender;
  extern bool am_cascading_walsender;
- extern volatile sig_atomic_t walsender_shutdown_requested;
  extern volatile sig_atomic_t walsender_ready_to_stop;
  extern bool wake_wal_senders;
  
--- 19,24 ----
***************
*** 27,33 **** extern bool wake_wal_senders;
  extern int	max_wal_senders;
  extern int	replication_timeout;
  
! extern void WalSenderMain(void) __attribute__((noreturn));
  extern void WalSndSignals(void);
  extern Size WalSndShmemSize(void);
  extern void WalSndShmemInit(void);
--- 26,34 ----
  extern int	max_wal_senders;
  extern int	replication_timeout;
  
! extern void InitWalSender(void);
! extern void exec_replication_command(const char *query_string);
! extern void WalSndErrorCleanup(void);
  extern void WalSndSignals(void);
  extern Size WalSndShmemSize(void);
  extern void WalSndShmemInit(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to