Hello,
the purpose of this patch is to limit impact of pg_backup on running server. Feedback is appreciated.

// Antonin Houska (Tony)
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index eb0c1d6..3b7ecfd 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -189,6 +189,22 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-r</option></term>
+      <term><option>--max-rate</option></term>
+      <listitem>
+       <para>
+	The maximum amount of data transferred from server per second.
+	The purpose is to limit impact of
+	<application>pg_basebackup</application> on a running master server.
+       </para>
+       <para>
+	Suffixes <literal>k</literal> (kilobytes) and <literal>m</literal>
+	(megabytes) are accepted. For example: <literal>10m</literal>
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-R</option></term>
       <term><option>--write-recovery-conf</option></term>
       <listitem>
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index a1e12a8..7fb2d78 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -45,11 +45,23 @@ bool		streamwal = false;
 bool		fastcheckpoint = false;
 bool		writerecoveryconf = false;
 int			standby_message_timeout = 10 * 1000;		/* 10 sec = default */
+double		max_rate = 0;		/* Maximum bytes per second. 0 implies
+								 * unlimited rate. */
+
+#define MAX_RATE_LOWER	0x00008000		/* 32 kB */
+#define MAX_RATE_UPPER	0x40000000		/* 1 GB */
+/*
+ * We shouldn't measure time whenever a small piece of data (e.g. TAR file
+ * header) has arrived. That would introduce high CPU overhead.
+ */
+#define RATE_MIN_SAMPLE 32768
 
 /* Progress counters */
 static uint64 totalsize;
 static uint64 totaldone;
+static uint64 last_measured;
 static int	tablespacecount;
+static int64 last_measured_time;
 
 /* Pipe to communicate with background wal receiver process */
 #ifndef WIN32
@@ -72,9 +84,14 @@ static volatile LONG has_xlogendptr = 0;
 static PQExpBuffer recoveryconfcontents = NULL;
 
 /* Function headers */
+
+
+
 static void usage(void);
 static void verify_dir_is_empty_or_create(char *dirname);
 static void progress_report(int tablespacenum, const char *filename);
+static double parse_max_rate(char *src);
+static void enforce_max_rate();
 
 static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
 static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
@@ -110,6 +127,7 @@ usage(void)
 	printf(_("\nOptions controlling the output:\n"));
 	printf(_("  -D, --pgdata=DIRECTORY receive base backup into directory\n"));
 	printf(_("  -F, --format=p|t       output format (plain (default), tar)\n"));
+	printf(_("  -r, --max-rate         maximum transfer rate between server and client\n"));
 	printf(_("  -R, --write-recovery-conf\n"
 			 "                         write recovery.conf after backup\n"));
 	printf(_("  -x, --xlog             include required WAL files in backup (fetch mode)\n"));
@@ -473,6 +491,113 @@ progress_report(int tablespacenum, const char *filename)
 	fprintf(stderr, "\r");
 }
 
+static double
+parse_max_rate(char *src)
+{
+	int			factor;
+	char	   *after_num;
+	double		result;
+
+	result = strtod(src, &after_num);
+	if (src == after_num)
+	{
+		fprintf(stderr, _("%s: invalid transfer rate \"%s\"\n"), progname, src);
+		exit(1);
+	}
+
+	/*
+	 * Evaluate (optional) suffix.
+	 *
+	 * after_num should now be right behind the numeric value.
+	 */
+	factor = 1;
+	switch (tolower(*after_num))
+	{
+			/*
+			 * Only the following suffixes are allowed. It's not too useful to
+			 * restrict the rate to gigabytes: such a rate will probably bring
+			 * significant impact on the master anyway, so the throttling
+			 * won't help much.
+			 */
+		case 'm':
+			factor <<= 10;
+		case 'k':
+			factor <<= 10;
+			after_num++;
+			break;
+
+		default:
+
+			/*
+			 * If there's no suffix, byte is the unit. Possible invalid letter
+			 * will make conversion to integer fail.
+			 */
+			break;
+	}
+
+	/* The rest can only consist of white space. */
+	while (*after_num != '\0')
+	{
+		if (!isspace(*after_num))
+		{
+			fprintf(stderr, _("%s: invalid transfer rate \"%s\"\n"), progname, src);
+			exit(1);
+		}
+		after_num++;
+	}
+
+	result *= factor;
+	if (result < MAX_RATE_LOWER || result > MAX_RATE_UPPER)
+	{
+		fprintf(stderr, _("%s: transfer rate out of range \"%s\"\n"), progname, src);
+		exit(1);
+	}
+	return result;
+}
+
+/*
+ * If the progress is more than what max_rate allows, sleep.
+ *
+ * Do not call if max_rate == 0.
+ */
+static void
+enforce_max_rate()
+{
+	int64		min_elapsed,
+				now,
+				elapsed,
+				to_sleep;
+
+	int64		last_chunk;
+
+	last_chunk = totaldone - last_measured;
+
+	/* The measurements shouldn't be more frequent then necessary. */
+	if (last_chunk < RATE_MIN_SAMPLE)
+		return;
+
+
+	now = localGetCurrentTimestamp();
+	elapsed = now - last_measured_time;
+
+	/*
+	 * max_rate relates to seconds, thus the expression in brackets is
+	 * milliseconds per byte.
+	 */
+	min_elapsed = last_chunk * (USECS_PER_SEC / max_rate);
+
+	/*
+	 * min_elapsed is the minimum time we must have spent to get here. If we
+	 * spent less, let's wait.
+	 */
+	to_sleep = min_elapsed - elapsed;
+	if (to_sleep > 0)
+		pg_usleep((long) to_sleep);
+
+	last_measured = totaldone;
+	last_measured_time = now + to_sleep;
+}
+
 
 /*
  * Write a piece of tar data
@@ -852,6 +977,8 @@ ReceiveTarFile(PGconn *conn, PGresult *res, int rownum)
 		totaldone += r;
 		if (showprogress)
 			progress_report(rownum, filename);
+		if (max_rate > 0)
+			enforce_max_rate();
 	}							/* while (1) */
 
 	if (copybuf != NULL)
@@ -1075,6 +1202,8 @@ ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
 			totaldone += r;
 			if (showprogress)
 				progress_report(rownum, filename);
+			if (max_rate > 0)
+				enforce_max_rate();
 
 			current_len_left -= r;
 			if (current_len_left == 0 && current_padding == 0)
@@ -1446,7 +1575,7 @@ BaseBackup(void)
 	/*
 	 * Sum up the total size, for progress reporting
 	 */
-	totalsize = totaldone = 0;
+	totalsize = totaldone = last_measured = 0;
 	tablespacecount = PQntuples(res);
 	for (i = 0; i < PQntuples(res); i++)
 	{
@@ -1488,6 +1617,8 @@ BaseBackup(void)
 	/*
 	 * Start receiving chunks
 	 */
+	last_measured_time = localGetCurrentTimestamp();
+
 	for (i = 0; i < PQntuples(res); i++)
 	{
 		if (format == 't')
@@ -1651,6 +1782,7 @@ main(int argc, char **argv)
 		{"pgdata", required_argument, NULL, 'D'},
 		{"format", required_argument, NULL, 'F'},
 		{"checkpoint", required_argument, NULL, 'c'},
+		{"max-rate", required_argument, NULL, 'r'},
 		{"write-recovery-conf", no_argument, NULL, 'R'},
 		{"xlog", no_argument, NULL, 'x'},
 		{"xlog-method", required_argument, NULL, 'X'},
@@ -1690,7 +1822,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "D:F:RxX:l:zZ:d:c:h:p:U:s:wWvP",
+	while ((c = getopt_long(argc, argv, "D:F:r:RxX:l:zZ:d:c:h:p:U:s:wWvP",
 							long_options, &option_index)) != -1)
 	{
 		switch (c)
@@ -1711,6 +1843,9 @@ main(int argc, char **argv)
 					exit(1);
 				}
 				break;
+			case 'r':
+				max_rate = parse_max_rate(optarg);
+				break;
 			case 'R':
 				writerecoveryconf = true;
 				break;
diff --git a/src/bin/pg_basebackup/receivelog.c b/src/bin/pg_basebackup/receivelog.c
index d56a4d7..f31300b 100644
--- a/src/bin/pg_basebackup/receivelog.c
+++ b/src/bin/pg_basebackup/receivelog.c
@@ -193,27 +193,6 @@ close_walfile(char *basedir, char *partial_suffix)
 
 
 /*
- * Local version of GetCurrentTimestamp(), since we are not linked with
- * backend code. The protocol always uses integer timestamps, regardless of
- * server setting.
- */
-static int64
-localGetCurrentTimestamp(void)
-{
-	int64		result;
-	struct timeval tp;
-
-	gettimeofday(&tp, NULL);
-
-	result = (int64) tp.tv_sec -
-		((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
-
-	result = (result * USECS_PER_SEC) + tp.tv_usec;
-
-	return result;
-}
-
-/*
  * Local version of TimestampDifference(), since we are not linked with
  * backend code.
  */
diff --git a/src/bin/pg_basebackup/streamutil.c b/src/bin/pg_basebackup/streamutil.c
index 1dfb80f..ea71cc0 100644
--- a/src/bin/pg_basebackup/streamutil.c
+++ b/src/bin/pg_basebackup/streamutil.c
@@ -210,3 +210,23 @@ GetConnection(void)
 		return tmpconn;
 	}
 }
+
+/*
+ * Local version of GetCurrentTimestamp().
+ * The protocol always uses integer timestamps, regardless of server setting.
+ */
+int64
+localGetCurrentTimestamp(void)
+{
+	int64		result;
+	struct timeval tp;
+
+	gettimeofday(&tp, NULL);
+
+	result = (int64) tp.tv_sec -
+		((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
+
+	result = (result * USECS_PER_SEC) + tp.tv_usec;
+
+	return result;
+}
diff --git a/src/bin/pg_basebackup/streamutil.h b/src/bin/pg_basebackup/streamutil.h
index 77d6b86..2cdebde 100644
--- a/src/bin/pg_basebackup/streamutil.h
+++ b/src/bin/pg_basebackup/streamutil.h
@@ -1,4 +1,7 @@
 #include "libpq-fe.h"
+#include "datatype/timestamp.h"
+
+#include <sys/time.h>
 
 extern const char *progname;
 extern char *connection_string;
@@ -17,3 +20,5 @@ extern PGconn *conn;
 	}
 
 extern PGconn *GetConnection(void);
+
+extern int64 localGetCurrentTimestamp(void);
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to