Hi,

I am reviewing your patch.

2013-10-10 15:32 keltezéssel, Antonin Houska írta:
On 10/09/2013 08:56 PM, Robert Haas wrote:
There seem to be several review comments made since you posted this
version.  I'll mark this Waiting on Author in the CommitFest
application, since it seems that the patch needs to be further
updated.
Since it was waiting for reviewer, I was not sure whether I should wait
for his findings and fix everything in a single transaction.
Nevertheless, the next version is attached here.

It fixes findings reported in
http://www.postgresql.org/message-id/20130903165652.gc5...@eldon.alvh.no-ip.org

As for units
http://www.postgresql.org/message-id/20130903224127.gd7...@awork2.anarazel.de
I'm not convinced about "MB" at the moment. Unfortunately I couldn't
find any other command-line PG utility requiring amount of data as an
option. Thus I use single-letter suffix, just as wget does for the same
purposes.

As for this
http://www.postgresql.org/message-id/20130903125155.ga18...@awork2.anarazel.de
there eventually seems to be a consensus (I notice now the discussion
was off-list):

On 2013-09-03 23:21:57 +0200, Antonin Houska wrote:
On 09/03/2013 02:51 PM, Andres Freund wrote:

It's probably better to use latches for the waiting, those have properly
defined interruption semantics. Whether pg_usleep will be interrupted is
platform dependant...
I noticed a comment about interruptions around the definition of
pg_usleep() function, but concluded that the sleeps are rather frequent
in this applications (typically in the order of tens to hundreds per
second, although the maximum of 256 might need to be decreased).
Therefore occasional interruptions shouldn't distort the actual rate
much. I'll think about it again. Thanks,
The issue is rather that you might not get woken up when you want to
be. Signal handlers in postgres tend to do a
SetLatch(&MyProc->procLatch); which then will interrupt sleeps done via
WaitLatch(). It's probably not that important with the duration of the
sleeps you have.

Greetings,

Andres Freund
// Antonin Houska (Tony)

* Is the patch in a patch format which has context?

Yes

* Does it apply cleanly to the current git master?

It applies with some offsets. Version "3a" that applies cleanly is attached.

* Does it include reasonable tests, necessary doc patches, etc?

Docs: yes. Tests: N/A

* Does the patch actually implement what it advertises?

Yes.

* Do we want that?

Yes.

* Do we already have it?

No.

* Does it follow SQL spec, or the community-agreed behavior?

No such SQL spec. The latest patch fixed all previously raised comments.

* Does it include pg_dump support (if applicable)?

N/A

* Are there dangers?

Yes, the "danger" of slowing down taking a base backup.
But this is the point.

* Have all the bases been covered?

Yes.

* Does the feature work as advertised?

Yes.

* Are there corner cases the author has failed to consider?

No.

* Are there any assertion failures or crashes?

No.

* Does the patch slow down simple tests?

No.

* If it claims to improve performance, does it?

N/A

* Does it slow down other things?

No.

* Does it follow the project coding guidelines?

Yes. A nitpicking: this else branch below might need brackets
because there is also a comment in that branch:

+                       /* The 'real data' starts now (header was ignored). */
+                       throttled_last = GetCurrentIntegerTimestamp();
+               }
+               else
+                       /* Disable throttling. */
+                       throttling_counter = -1;
+

* Are there portability issues?

No.

* Will it work on Windows/BSD etc?

It should, there are no dangerous calls besides the above mentioned
pg_usleep(). But waking up from pg_usleep() earlier makes rate limiting
fluctuate slightly, not fail.

* Are the comments sufficient and accurate?

Yes.

* Does it do what it says, correctly?

Yes.

Although it should be mentioned in the docs that rate limiting
applies to walsenders individually, not globally. I tried this
on a freshly created database:

$ time pg_basebackup -D data2 -r 1M -X stream -h 127.0.0.1
real    0m26.508s
user    0m0.054s
sys    0m0.360s

The source database had 3 WAL files in pg_xlog, one of them was
also streamed. The final size of "data2" was 43MB or 26MB without pg_xlog,
i.e. without the "-X stream" option. The backup used 2 walsenders
in parallel (one for WAL) which is a known feature.

* Does it produce compiler warnings?

No.

*Can you make it crash?

No.

Consider the changes to the code in the context of the project as a whole:

* Is everything done in a way that fits together coherently with other 
features/modules?

Yes.

* Are there interdependencies that can cause problems?

No.

Another note. This chunk should be submitted separately as a comment bugfix:

diff --git a/src/backend/utils/adt/timestamp.c 
b/src/backend/utils/adt/timestamp.c
index c3c71b7..5736fd8 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1288,7 +1288,7 @@ GetCurrentTimestamp(void)
 /*
  * GetCurrentIntegerTimestamp -- get the current operating system time as int64
  *
- * Result is the number of milliseconds since the Postgres epoch. If compiled
+ * Result is the number of microseconds since the Postgres epoch. If compiled
  * with --enable-integer-datetimes, this is identical to GetCurrentTimestamp(),
  * and is implemented as a macro.
  */

Best regards,
Zoltán Böszörményi

--
----------------------------------
Zoltán Böszörményi
Cybertec Schönig & Schönig GmbH
Gröhrmühlgasse 26
A-2700 Wiener Neustadt, Austria
Web: http://www.postgresql-support.de
     http://www.postgresql.at/

diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index c379df5..73bb999 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -189,6 +189,22 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-r</option></term>
+      <term><option>--max-rate</option></term>
+      <listitem>
+       <para>
+        The maximum amount of data transferred from server per second.
+        The purpose is to limit impact of <application>pg_basebackup</application>
+        on a running master server while transfering data directory.
+       </para>
+       <para>
+        Suffixes <literal>k</literal> (kilobytes) and <literal>M</literal>
+        (megabytes) are accepted. For example: <literal>10M</literal>
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-R</option></term>
       <term><option>--write-recovery-conf</option></term>
       <listitem>
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index ba8d173..b2f10c3 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -33,6 +33,7 @@
 #include "utils/builtins.h"
 #include "utils/elog.h"
 #include "utils/ps_status.h"
+#include "utils/timestamp.h"
 #include "pgtar.h"
 
 typedef struct
@@ -42,6 +43,7 @@ typedef struct
 	bool		fastcheckpoint;
 	bool		nowait;
 	bool		includewal;
+	uint32		maxrate;
 } basebackup_options;
 
 
@@ -59,6 +61,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
 static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
 static int	compareWalFileNames(const void *a, const void *b);
+static void throttle(size_t increment);
 
 /* Was the backup currently in-progress initiated in recovery mode? */
 static bool backup_started_in_recovery = false;
@@ -68,6 +71,41 @@ static bool backup_started_in_recovery = false;
  */
 #define TAR_SEND_SIZE 32768
 
+
+/*
+ * The maximum amount of data per second - bounds of the user input.
+ *
+ * If the maximum should be increased to more than 4 GB, uint64 must
+ * be introduced for the related variables. However such high values have
+ * little to do with throttling.
+ */
+#define MAX_RATE_LOWER	32768
+#define MAX_RATE_UPPER	(1024 << 20)
+
+/*
+ * Transfer rate is only measured when this number of bytes has been sent.
+ * (Too frequent checks would impose too high CPU overhead.)
+ *
+ * The default value is used unless it'd result in too frequent checks.
+ */
+#define THROTTLING_SAMPLE_MIN	32768
+
+/* The maximum number of checks per second.  */
+#define THROTTLING_MAX_FREQUENCY	128
+
+/* The actual value, transfer of which may cause sleep. */
+static uint32 throttling_sample;
+
+/* Amount of data already transfered but not yet throttled.  */
+static int32 throttling_counter;
+
+/* The minimum time required to transfer throttling_sample bytes. */
+static int64 elapsed_min_unit;
+
+/* The last check of the transfer rate. */
+static int64 throttled_last;
+
+
 typedef struct
 {
 	char	   *oid;
@@ -171,6 +209,31 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
 		/* Send tablespace header */
 		SendBackupHeader(tablespaces);
 
+		if (opt->maxrate > 0)
+		{
+			throttling_sample = opt->maxrate / THROTTLING_MAX_FREQUENCY;
+
+			/* Don't measure too small pieces of data. */
+			if (throttling_sample < THROTTLING_SAMPLE_MIN)
+				throttling_sample = THROTTLING_SAMPLE_MIN;
+
+			/*
+			 * opt->maxrate is bytes per seconds. Thus the expression in
+			 * brackets is microseconds per byte.
+			 */
+			elapsed_min_unit = throttling_sample *
+				((double) USECS_PER_SEC / opt->maxrate);
+
+			/* Enable throttling. */
+			throttling_counter = 0;
+
+			/* The 'real data' starts now (header was ignored). */
+			throttled_last = GetCurrentIntegerTimestamp();
+		}
+		else
+			/* Disable throttling. */
+			throttling_counter = -1;
+
 		/* Send off our tablespaces one by one */
 		foreach(lc, tablespaces)
 		{
@@ -468,6 +531,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_fast = false;
 	bool		o_nowait = false;
 	bool		o_wal = false;
+	bool		o_maxrate = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	foreach(lopt, options)
@@ -519,6 +583,29 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			opt->includewal = true;
 			o_wal = true;
 		}
+		else if (strcmp(defel->defname, "maxrate") == 0)
+		{
+			long		maxrate;
+
+			if (o_maxrate)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+			maxrate = intVal(defel->arg);
+
+			opt->maxrate = (uint32) maxrate;
+			if (opt->maxrate > 0 &&
+			(opt->maxrate < MAX_RATE_LOWER || opt->maxrate > MAX_RATE_UPPER))
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+				  errmsg("transfer rate %u bytes per second is out of range",
+						 opt->maxrate),
+				 errhint("The accepted range is %u through %u kB per second",
+						 MAX_RATE_LOWER >> 10, MAX_RATE_UPPER >> 10)));
+			}
+			o_maxrate = true;
+		}
 		else
 			elog(ERROR, "option \"%s\" not recognized",
 				 defel->defname);
@@ -1019,6 +1106,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
 			   (errmsg("base backup could not send data, aborting backup")));
 
 		len += cnt;
+		throttle(cnt);
 
 		if (len >= statbuf->st_size)
 		{
@@ -1040,10 +1128,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
 			cnt = Min(sizeof(buf), statbuf->st_size - len);
 			pq_putmessage('d', buf, cnt);
 			len += cnt;
+			throttle(cnt);
 		}
 	}
 
-	/* Pad to 512 byte boundary, per tar format requirements */
+	/*
+	 * Pad to 512 byte boundary, per tar format requirements. (This small
+	 * piece of data is probably not worth throttling.)
+	 */
 	pad = ((len + 511) & ~511) - len;
 	if (pad > 0)
 	{
@@ -1069,3 +1161,47 @@ _tarWriteHeader(const char *filename, const char *linktarget,
 
 	pq_putmessage('d', h, 512);
 }
+
+static void
+throttle(size_t increment)
+{
+	int64		elapsed,
+				elapsed_min,
+				sleep;
+
+	if (throttling_counter < 0)
+		return;
+
+	throttling_counter += increment;
+	if (throttling_counter < throttling_sample)
+		return;
+
+	/* Time elapsed since the last measuring (and possible wake up). */
+	elapsed = GetCurrentIntegerTimestamp() - throttled_last;
+	/* How much should have elapsed at minimum? */
+	elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
+	sleep = elapsed_min - elapsed;
+	/* Only sleep if the transfer is faster than it should be. */
+	if (sleep > 0)
+
+		/*
+		 * THROTTLING_SAMPLE_MIN / MAX_RATE_LOWER (in seconds) should be the
+		 * longest possible time to sleep. Thus the cast to long is safe.
+		 */
+		pg_usleep((long) sleep);
+	else
+
+		/*
+		 * The actual transfer rate is below the limit. Negative value would
+		 * distort the adjustment of throttled_last.
+		 */
+		sleep = 0;
+
+	/*
+	 * Only the whole multiples of throttling_sample processed. The rest will
+	 * be done during the next call of this function.
+	 */
+	throttling_counter %= throttling_sample;
+	/* Once the (possible) sleep ends, new period starts. */
+	throttled_last += elapsed + sleep;
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 8c83780..1c2c31c 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -78,6 +78,7 @@ Node *replication_parse_result;
 %token K_PROGRESS
 %token K_FAST
 %token K_NOWAIT
+%token K_MAX_RATE
 %token K_WAL
 %token K_TIMELINE
 
@@ -116,7 +117,7 @@ identify_system:
 			;
 
 /*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+ * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [MAX_RATE %d]
  */
 base_backup:
 			K_BASE_BACKUP base_backup_opt_list
@@ -156,6 +157,11 @@ base_backup_opt:
 				  $$ = makeDefElem("nowait",
 						   (Node *)makeInteger(TRUE));
 				}
+			| K_MAX_RATE UCONST
+				{
+				  $$ = makeDefElem("maxrate",
+						   (Node *)makeInteger($2));
+				}
 			;
 
 /*
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 3d930f1..b2d5e3b 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -71,6 +71,7 @@ IDENTIFY_SYSTEM		{ return K_IDENTIFY_SYSTEM; }
 LABEL			{ return K_LABEL; }
 NOWAIT			{ return K_NOWAIT; }
 PROGRESS			{ return K_PROGRESS; }
+MAX_RATE		{ return K_MAX_RATE; }
 WAL			{ return K_WAL; }
 TIMELINE			{ return K_TIMELINE; }
 START_REPLICATION	{ return K_START_REPLICATION; }
diff --git a/src/backend/utils/adt/timestamp.c b/src/backend/utils/adt/timestamp.c
index c3c71b7..5736fd8 100644
--- a/src/backend/utils/adt/timestamp.c
+++ b/src/backend/utils/adt/timestamp.c
@@ -1288,7 +1288,7 @@ GetCurrentTimestamp(void)
 /*
  * GetCurrentIntegerTimestamp -- get the current operating system time as int64
  *
- * Result is the number of milliseconds since the Postgres epoch. If compiled
+ * Result is the number of microseconds since the Postgres epoch. If compiled
  * with --enable-integer-datetimes, this is identical to GetCurrentTimestamp(),
  * and is implemented as a macro.
  */
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 6706c0c..7a86cd7 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -46,6 +46,7 @@ bool		streamwal = false;
 bool		fastcheckpoint = false;
 bool		writerecoveryconf = false;
 int			standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+uint32		maxrate = 0;		/* No limit by default. */
 
 /* Progress counters */
 static uint64 totalsize;
@@ -76,6 +77,7 @@ static PQExpBuffer recoveryconfcontents = NULL;
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname);
 static void progress_report(int tablespacenum, const char *filename);
+static uint32 parse_max_rate(char *src);
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
@@ -111,6 +113,7 @@ usage(void)
 	printf(_("\nOptions controlling the output:\n"));
 	printf(_("  -D, --pgdata=DIRECTORY receive base backup into directory\n"));
 	printf(_("  -F, --format=p|t       output format (plain (default), tar)\n"));
+	printf(_("  -r, --max-rate         maximum transfer rate to transfer data directory\n"));
 	printf(_("  -R, --write-recovery-conf\n"
 			 "                         write recovery.conf after backup\n"));
 	printf(_("  -x, --xlog             include required WAL files in backup (fetch mode)\n"));
@@ -476,6 +479,94 @@ progress_report(int tablespacenum, const char *filename)
 }
 
 
+static uint32
+parse_max_rate(char *src)
+{
+	int			factor;
+	char	   *after_num;
+	int64		result;
+	int			errno_copy;
+
+	result = strtol(src, &after_num, 0);
+	errno_copy = errno;
+	if (src == after_num)
+	{
+		fprintf(stderr, _("%s: transfer rate \"%s\" is not a valid integer value\n"), progname, src);
+		exit(1);
+	}
+
+
+	/*
+	 * Evaluate (optional) suffix.
+	 *
+	 * after_num should now be right behind the numeric value.
+	 */
+	factor = 1;
+	switch (*after_num)
+	{
+			/*
+			 * Only the following suffixes are allowed. It's not too useful to
+			 * restrict the rate to gigabytes: such a rate will probably bring
+			 * significant impact on the master anyway, so the throttling
+			 * won't help much.
+			 */
+		case 'M':
+			factor <<= 10;
+		case 'k':
+			factor <<= 10;
+			after_num++;
+			break;
+
+		default:
+
+			/*
+			 * If there's no suffix, byte is the unit. Possible invalid letter
+			 * will make conversion to integer fail.
+			 */
+			break;
+	}
+
+	/* The rest can only consist of white space. */
+	while (*after_num != '\0')
+	{
+		if (!isspace(*after_num))
+		{
+			fprintf(stderr, _("%s: invalid transfer rate \"%s\"\n"), progname, src);
+			exit(1);
+		}
+		after_num++;
+	}
+
+	/* Some checks only make sense when we know than the suffix is correct. */
+	if (result <= 0)
+	{
+		/*
+		 * Reject obviously wrong values here. Exact check of the range to be
+		 * done on server.
+		 */
+		fprintf(stderr, _("%s: transfer must be greater than zero\n"), progname);
+		exit(1);
+	}
+	if (errno_copy == ERANGE || result != (uint64) ((uint32) result))
+	{
+		fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer range\n"), progname, src);
+		exit(1);
+	}
+
+	if (factor > 1)
+	{
+		result *= factor;
+		/* Check the integer range once again. */
+		if (result != (uint64) ((uint32) result))
+		{
+			fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer range\n"), progname, src);
+			exit(1);
+		}
+	}
+	return (uint32) result;
+}
+
+
 /*
  * Write a piece of tar data
  */
@@ -1310,6 +1401,7 @@ BaseBackup(void)
 	uint32		starttli;
 	char		current_path[MAXPGPATH];
 	char		escaped_label[MAXPGPATH];
+	char		maxrate_clause[MAXPGPATH];
 	int			i;
 	char		xlogstart[64];
 	char		xlogend[64];
@@ -1382,13 +1474,18 @@ BaseBackup(void)
 	 * Start the actual backup
 	 */
 	PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
+	if (maxrate > 0)
+		snprintf(maxrate_clause, sizeof(maxrate_clause), "MAX_RATE %u", maxrate);
+	else
+		maxrate_clause[0] = '\0';
 	snprintf(current_path, sizeof(current_path),
-			 "BASE_BACKUP LABEL '%s' %s %s %s %s",
+			 "BASE_BACKUP LABEL '%s' %s %s %s %s %s",
 			 escaped_label,
 			 showprogress ? "PROGRESS" : "",
 			 includewal && !streamwal ? "WAL" : "",
 			 fastcheckpoint ? "FAST" : "",
-			 includewal ? "NOWAIT" : "");
+			 includewal ? "NOWAIT" : "",
+			 maxrate_clause);
 
 	if (PQsendQuery(conn, current_path) == 0)
 	{
@@ -1657,6 +1754,7 @@ main(int argc, char **argv)
 		{"pgdata", required_argument, NULL, 'D'},
 		{"format", required_argument, NULL, 'F'},
 		{"checkpoint", required_argument, NULL, 'c'},
+		{"max-rate", required_argument, NULL, 'r'},
 		{"write-recovery-conf", no_argument, NULL, 'R'},
 		{"xlog", no_argument, NULL, 'x'},
 		{"xlog-method", required_argument, NULL, 'X'},
@@ -1697,7 +1795,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:F:RxX:l:zZ:d:c:h:p:U:s:wWvP",
+	while ((c = getopt_long(argc, argv, "D:F:r:RxX:l:zZ:d:c:h:p:U:s:wWvP",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -1718,6 +1816,9 @@ main(int argc, char **argv)
 					exit(1);
 				}
 				break;
+			case 'r':
+				maxrate = parse_max_rate(optarg);
+				break;
 			case 'R':
 				writerecoveryconf = true;
 				break;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to