On Fri, Oct 31, 2014 at 5:46 PM,  <furu...@pm.nttdata.co.jp> wrote:
>> > We seem to be going in circles. You suggested having two options,
>> > --feedback, and --fsync, which is almost exactly what Furuya posted
>> > originally. I objected to that, because I think that user interface
>> is
>> > too complicated. Instead, I suggested having just a single option
>> > called --synchronous, or even better, have no option at all and have
>> > the server tell the client if it's participating in synchronous
>> > replication, and have pg_receivexlog automatically fsync when it is,
>> > and not otherwise [1]. That way you don't need to expose any new
>> > options to the user. What did you think of that idea?
>>
>> I think it's pretty weird to make the fsync behavior of the client is
>> controlled by the server.
>>
>> I also don't think that fsync() on the client side is useless in
>> asynchronous replication.  Yeah, it's true that there are no
>> *guarantees* with asynchronous replication, but the bound on how long
>> the data can take to get out to disk is a heck of a lot shorter if you
>> fsync frequently than if you don't.  And on the flip side, that has a
>> performance impact.
>>
>> So I actually think the design you proposed is not as good as what was
>> proposed by Furuya and Simon.  But I don't feel incredibly strongly about
>> it.
>
> Thanks for lots of comments!!
>
> I fixed the patch.
> As a default, it behave like a walreceiver.
> Same as walreceiver, it fsync and send a feedback after fsync.

On second thought, flipping the default behavior seems not worthwhile here.
Which might surprise the existing users and cause some troubles to them.
I'd like to back to the Heikki's original suggestion like just adding
--synchronous
option. That is, only when --synchronous is specified, WAL is flushed and
feedback is sent back as soon as WAL is received.

I changed the patch heavily in that way. Please find the attached patch.
By default, pg_receivexlog flushes WAL data only when WAL file is closed.
If --synchronous is specified, like the standby's WAL receiver,
sync commands are issued as soon as there is WAL data which has not been
flushed yet. Also status packets are sent back to the server just after
WAL data is flushed whatever --status-interval is set to. I added
the description of this behavior to the doc.

Thought?

Regards,

-- 
Fujii Masao
*** a/doc/src/sgml/ref/pg_receivexlog.sgml
--- b/doc/src/sgml/ref/pg_receivexlog.sgml
***************
*** 49,54 **** PostgreSQL documentation
--- 49,61 ----
    </para>
  
    <para>
+    Unlike the standby's WAL receiver, <application>pg_receivexlog</>
+    flushes WAL data only when WAL file is closed, by default.
+    <literal>--synchronous</> option must be specified to flush WAL data
+    in real time and ensure it's safely flushed to disk.
+   </para>
+ 
+   <para>
     The transaction log is streamed over a regular
     <productname>PostgreSQL</productname> connection, and uses the replication
     protocol. The connection must be made with a superuser or a user
***************
*** 86,106 **** PostgreSQL documentation
       </varlistentry>
  
       <varlistentry>
-        <term><option>-F <replaceable class="parameter">interval</replaceable></option></term>
-        <term><option>--fsync-interval=<replaceable class="parameter">interval</replaceable></option></term>
-        <listitem>
-         <para>
-         Specifies the maximum time to issue sync commands to ensure the
-         received WAL file is safely flushed to disk, in seconds. The default
-         value is zero, which disables issuing fsyncs except when WAL file is
-         closed. If <literal>-1</literal> is specified, WAL file is flushed as
-         soon as possible, that is, as soon as there are WAL data which has
-         not been flushed yet.
-         </para>
-        </listitem>
-       </varlistentry>
- 
-      <varlistentry>
        <term><option>-n</option></term>
        <term><option>--no-loop</option></term>
        <listitem>
--- 93,98 ----
***************
*** 135,150 **** PostgreSQL documentation
           When this option is used, <application>pg_receivexlog</> will report
           a flush position to the server, indicating when each segment has been
           synchronized to disk so that the server can remove that segment if it
!          is not otherwise needed.  When using this parameter, it is important
!          to make sure that <application>pg_receivexlog</> cannot become the
!          synchronous standby through an incautious setting of
!          <xref linkend="guc-synchronous-standby-names">; it does not flush
!          data frequently enough for this to work correctly.
          </para>
        </listitem>
       </varlistentry>
  
       <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
--- 127,152 ----
           When this option is used, <application>pg_receivexlog</> will report
           a flush position to the server, indicating when each segment has been
           synchronized to disk so that the server can remove that segment if it
!          is not otherwise needed. <literal>--synchronous</literal> option must
!         be specified when making <application>pg_receivexlog</> run as
!         synchronous standby by using replication slot. Otherwise WAL data
!         cannot be flushed frequently enough for this to work correctly.
          </para>
        </listitem>
       </varlistentry>
  
       <varlistentry>
+       <term><option>--synchronous</option></term>
+       <listitem>
+        <para>
+         Issue sync commands as soon as there is WAL data which has not been
+         flushed yet. Also status packets are sent back to the server just after
+         WAL data is flushed whatever <literal>--status-interval</> is set to.
+        </para>
+       </listitem>
+      </varlistentry>
+ 
+      <varlistentry>
        <term><option>-v</option></term>
        <term><option>--verbose</option></term>
        <listitem>
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
***************
*** 370,376 **** LogStreamerMain(logstreamer_param *param)
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, 0))
  
  		/*
  		 * Any errors will already have been reported in the function process,
--- 370,376 ----
  	if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
  						   param->sysidentifier, param->xlogdir,
  						   reached_end_position, standby_message_timeout,
! 						   NULL, false))
  
  		/*
  		 * Any errors will already have been reported in the function process,
*** a/src/bin/pg_basebackup/pg_receivexlog.c
--- b/src/bin/pg_basebackup/pg_receivexlog.c
***************
*** 36,45 **** static char *basedir = NULL;
  static int	verbose = 0;
  static int	noloop = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
- static int	fsync_interval = 0; /* 0 = default */
  static volatile bool time_to_abort = false;
  static bool do_create_slot = false;
  static bool do_drop_slot = false;
  
  
  static void usage(void);
--- 36,45 ----
  static int	verbose = 0;
  static int	noloop = 0;
  static int	standby_message_timeout = 10 * 1000;		/* 10 sec = default */
  static volatile bool time_to_abort = false;
  static bool do_create_slot = false;
  static bool do_drop_slot = false;
+ static bool synchronous = false;
  
  
  static void usage(void);
***************
*** 66,77 **** usage(void)
  	printf(_("  %s [OPTION]...\n"), progname);
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
- 	printf(_("  -F  --fsync-interval=SECS\n"
- 			 "                         time between fsyncs to transaction log files (default: %d)\n"), (fsync_interval / 1000));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
  	printf(_("  -s, --status-interval=SECS\n"
  			 "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
  	printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
--- 66,76 ----
  	printf(_("  %s [OPTION]...\n"), progname);
  	printf(_("\nOptions:\n"));
  	printf(_("  -D, --directory=DIR    receive transaction log files into this directory\n"));
  	printf(_("  -n, --no-loop          do not loop on connection lost\n"));
  	printf(_("  -s, --status-interval=SECS\n"
  			 "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
  	printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
+ 	printf(_("      --synchronous      flush transaction log in real time\n"));
  	printf(_("  -v, --verbose          output verbose messages\n"));
  	printf(_("  -V, --version          output version information, then exit\n"));
  	printf(_("  -?, --help             show this help, then exit\n"));
***************
*** 343,349 **** StreamLog(void)
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
  					  stop_streaming, standby_message_timeout, ".partial",
! 					  fsync_interval);
  
  	PQfinish(conn);
  	conn = NULL;
--- 342,348 ----
  
  	ReceiveXlogStream(conn, startpos, starttli, NULL, basedir,
  					  stop_streaming, standby_message_timeout, ".partial",
! 					  synchronous);
  
  	PQfinish(conn);
  	conn = NULL;
***************
*** 374,380 **** main(int argc, char **argv)
  		{"port", required_argument, NULL, 'p'},
  		{"username", required_argument, NULL, 'U'},
  		{"no-loop", no_argument, NULL, 'n'},
- 		{"fsync-interval", required_argument, NULL, 'F'},
  		{"no-password", no_argument, NULL, 'w'},
  		{"password", no_argument, NULL, 'W'},
  		{"status-interval", required_argument, NULL, 's'},
--- 373,378 ----
***************
*** 383,388 **** main(int argc, char **argv)
--- 381,387 ----
  /* action */
  		{"create-slot", no_argument, NULL, 1},
  		{"drop-slot", no_argument, NULL, 2},
+ 		{"synchronous", no_argument, NULL, 3},
  		{NULL, 0, NULL, 0}
  	};
  
***************
*** 408,414 **** main(int argc, char **argv)
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nF:wWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
--- 407,413 ----
  		}
  	}
  
! 	while ((c = getopt_long(argc, argv, "D:d:h:p:U:s:S:nwWv",
  							long_options, &option_index)) != -1)
  	{
  		switch (c)
***************
*** 455,469 **** main(int argc, char **argv)
  			case 'n':
  				noloop = 1;
  				break;
- 		case 'F':
- 			fsync_interval = atoi(optarg) * 1000;
- 			if (fsync_interval < -1000)
- 			{
- 				fprintf(stderr, _("%s: invalid fsync interval \"%s\"\n"),
- 						progname, optarg);
- 				exit(1);
- 			}
- 			break;
  			case 'v':
  				verbose++;
  				break;
--- 454,459 ----
***************
*** 474,479 **** main(int argc, char **argv)
--- 464,472 ----
  			case 2:
  				do_drop_slot = true;
  				break;
+ 			case 3:
+ 				synchronous = true;
+ 				break;
  			default:
  
  				/*
*** a/src/bin/pg_basebackup/receivelog.c
--- b/src/bin/pg_basebackup/receivelog.c
***************
*** 31,44 **** static char current_walfile_name[MAXPGPATH] = "";
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
- static int64 last_fsync = -1;		/* timestamp of last WAL file flush */
  static bool still_sending = true;		/* feedback still needs to be sent? */
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
  				  char *partial_suffix, XLogRecPtr *stoppos,
! 				  int fsync_interval);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
--- 31,43 ----
  static bool reportFlushPosition = false;
  static XLogRecPtr lastFlushPosition = InvalidXLogRecPtr;
  
  static bool still_sending = true;		/* feedback still needs to be sent? */
  
  static PGresult *HandleCopyStream(PGconn *conn, XLogRecPtr startpos,
  				 uint32 timeline, char *basedir,
  			   stream_stop_callback stream_stop, int standby_message_timeout,
  				  char *partial_suffix, XLogRecPtr *stoppos,
! 				  bool synchronous);
  static int CopyStreamPoll(PGconn *conn, long timeout_ms);
  static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
  static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
***************
*** 55,62 **** static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
  								stream_stop_callback stream_stop,
  								char *partial_suffix, XLogRecPtr *stoppos);
  static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! 										 int64 last_status, int fsync_interval,
! 										 XLogRecPtr blockpos);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
--- 54,60 ----
  								stream_stop_callback stream_stop,
  								char *partial_suffix, XLogRecPtr *stoppos);
  static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! 										 int64 last_status);
  
  static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
  						 uint32 *timeline);
***************
*** 209,215 **** close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
  				progname, current_walfile_name, partial_suffix);
  
  	lastFlushPosition = pos;
- 	last_fsync = feGetCurrentTimestamp();
  	return true;
  }
  
--- 207,212 ----
***************
*** 440,447 **** CheckServerVersionForStreaming(PGconn *conn)
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
!  * fsync_interval controls how often we flush to the received WAL file,
!  * in milliseconds.
   *
   * Note: The log position *must* be at a log segment start!
   */
--- 437,444 ----
   * allows you to tell the difference between partial and completed files,
   * so that you can continue later where you left.
   *
!  * If 'synchronous' is true, the received WAL is flushed as soon as written,
!  * otherwise only when the WAL file is closed.
   *
   * Note: The log position *must* be at a log segment start!
   */
***************
*** 450,456 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout, char *partial_suffix,
! 				  int fsync_interval)
  {
  	char		query[128];
  	char		slotcmd[128];
--- 447,453 ----
  				  char *sysidentifier, char *basedir,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout, char *partial_suffix,
! 				  bool synchronous)
  {
  	char		query[128];
  	char		slotcmd[128];
***************
*** 595,601 **** ReceiveXlogStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, fsync_interval);
  		if (res == NULL)
  			goto error;
  
--- 592,598 ----
  		/* Stream the WAL */
  		res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
  							   standby_message_timeout, partial_suffix,
! 							   &stoppos, synchronous);
  		if (res == NULL)
  			goto error;
  
***************
*** 760,766 **** static PGresult *
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, int fsync_interval)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
--- 757,763 ----
  HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  				 char *basedir, stream_stop_callback stream_stop,
  				 int standby_message_timeout, char *partial_suffix,
! 				 XLogRecPtr *stoppos, bool synchronous)
  {
  	char	   *copybuf = NULL;
  	int64		last_status = -1;
***************
*** 784,797 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		now = feGetCurrentTimestamp();
  
  		/*
! 		 * If fsync_interval has elapsed since last WAL flush and we've written
! 		 * some WAL data, flush them to disk.
  		 */
  		if (lastFlushPosition < blockpos &&
! 			walfile != -1 &&
! 			((fsync_interval > 0 &&
! 			  feTimestampDifferenceExceeds(last_fsync, now, fsync_interval)) ||
! 			 fsync_interval < 0))
  		{
  			if (fsync(walfile) != 0)
  			{
--- 781,791 ----
  		now = feGetCurrentTimestamp();
  
  		/*
! 		 * Issue sync command as soon as there are WAL data which
! 		 * has not been flushed yet if synchronous option is true.
  		 */
  		if (lastFlushPosition < blockpos &&
! 			walfile != -1 && synchronous)
  		{
  			if (fsync(walfile) != 0)
  			{
***************
*** 799,807 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  						progname, current_walfile_name, strerror(errno));
  				goto error;
  			}
- 
  			lastFlushPosition = blockpos;
! 			last_fsync = now;
  		}
  
  		/*
--- 793,807 ----
  						progname, current_walfile_name, strerror(errno));
  				goto error;
  			}
  			lastFlushPosition = blockpos;
! 
! 			/*
! 			 * Send feedback so that the server sees the latest WAL locations
! 			 * immediately if synchronous option is true.
! 			 */
! 			if (!sendFeedback(conn, blockpos, now, false))
! 				goto error;
! 			last_status = now;
  		}
  
  		/*
***************
*** 821,827 **** HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
  		 * Calculate how long send/receive loops should sleep
  		 */
  		sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
! 												 last_status, fsync_interval, blockpos);
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
  		while (r != 0)
--- 821,827 ----
  		 * Calculate how long send/receive loops should sleep
  		 */
  		sleeptime = CalculateCopyStreamSleeptime(now, standby_message_timeout,
! 												 last_status);
  
  		r = CopyStreamReceive(conn, sleeptime, &copybuf);
  		while (r != 0)
***************
*** 1244,1277 **** CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
   */
  static long
  CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! 							 int64 last_status, int fsync_interval, XLogRecPtr blockpos)
  {
- 	int64		targettime = 0;
  	int64		status_targettime = 0;
- 	int64		fsync_targettime = 0;
  	long		sleeptime;
  
  	if (standby_message_timeout && still_sending)
  		status_targettime = last_status +
  			(standby_message_timeout - 1) * ((int64) 1000);
  
! 	if (fsync_interval > 0 && lastFlushPosition < blockpos)
! 		fsync_targettime = last_fsync +
! 			(fsync_interval - 1) * ((int64) 1000);
! 
! 	if ((status_targettime < fsync_targettime && status_targettime > 0) ||
! 		fsync_targettime == 0)
! 		targettime = status_targettime;
! 	else
! 		targettime = fsync_targettime;
! 
! 	if (targettime > 0)
  	{
  		long		secs;
  		int			usecs;
  
  		feTimestampDifference(now,
! 							  targettime,
  							  &secs,
  							  &usecs);
  		/* Always sleep at least 1 sec */
--- 1244,1265 ----
   */
  static long
  CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
! 							 int64 last_status)
  {
  	int64		status_targettime = 0;
  	long		sleeptime;
  
  	if (standby_message_timeout && still_sending)
  		status_targettime = last_status +
  			(standby_message_timeout - 1) * ((int64) 1000);
  
! 	if (status_targettime > 0)
  	{
  		long		secs;
  		int			usecs;
  
  		feTimestampDifference(now,
! 							  status_targettime,
  							  &secs,
  							  &usecs);
  		/* Always sleep at least 1 sec */
*** a/src/bin/pg_basebackup/receivelog.h
--- b/src/bin/pg_basebackup/receivelog.h
***************
*** 31,36 **** extern bool ReceiveXlogStream(PGconn *conn,
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
  				  char *partial_suffix,
! 				  int fsync_interval);
  
  #endif	/* RECEIVELOG_H */
--- 31,36 ----
  				  stream_stop_callback stream_stop,
  				  int standby_message_timeout,
  				  char *partial_suffix,
! 				  bool synchronous);
  
  #endif	/* RECEIVELOG_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to