Thanks for checking. The new version addresses your findings.
// Antonin Houska (Tony)
On 12/09/2013 03:49 PM, Fujii Masao wrote:
> On Fri, Dec 6, 2013 at 6:43 PM, Boszormenyi Zoltan <[email protected]> wrote:
>> Hi,
>>
>> 2013-12-05 15:36 keltezéssel, Antonin Houska írta:
>>
>>> On 12/02/2013 02:23 PM, Boszormenyi Zoltan wrote:
>>>>
>>>> Hi,
>>>>
>>>> I am reviewing your patch.
>>>
>>> Thanks. New version attached.
>>
>>
>> I have reviewed and tested it and marked it as ready for committer.
>
> Here are the review comments:
>
> + <term><option>-r</option></term>
> + <term><option>--max-rate</option></term>
>
> You need to add something like <replaceable
> class="parameter">rate</replaceable>.
>
> + The purpose is to limit impact of
> <application>pg_basebackup</application>
> + on a running master server.
>
> s/"master server"/"server" because we can take a backup from also the standby.
>
> I think that it's better to document the default value and the accepted range
> of
> the rate that we can specify.
>
> You need to change the protocol.sgml because you changed BASE_BACKUP
> replication command.
>
> + printf(_(" -r, --max-rate maximum transfer rate to
> transfer data directory\n"));
>
> You need to add something like =RATE just after --max-rate.
>
> + result = strtol(src, &after_num, 0);
>
> errno should be set to 0 just before calling strtol().
>
> + if (errno_copy == ERANGE || result != (uint64) ((uint32) result))
> + {
> + fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer
> range\n"), progname, src);
> + exit(1);
> + }
>
> We can move this check after the check of "src == after_num" like
> parse_int() in guc.c does.
> If we do this, the local variable 'errno_copy' is no longer necessary.
>
> I think that it's better to output the hint message like "Valid units for
> the transfer rate are \"k\" and \"M\"." when a user specified wrong unit.
>
> + /*
> + * THROTTLING_SAMPLE_MIN / MAX_RATE_LOWER (in seconds) should be the
> + * longest possible time to sleep. Thus the cast to long is safe.
> + */
> + pg_usleep((long) sleep);
>
> It's better to use the latch here so that we can interrupt immediately.
>
> Regards,
>
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 0b2e60e..2f24fff 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -1719,7 +1719,7 @@ The commands accepted in walsender mode are:
</varlistentry>
<varlistentry>
- <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>]</term>
+ <term>BASE_BACKUP [<literal>LABEL</literal> <replaceable>'label'</replaceable>] [<literal>PROGRESS</literal>] [<literal>FAST</literal>] [<literal>WAL</literal>] [<literal>NOWAIT</literal>] [<literal>MAX_RATE</literal>]</term>
<listitem>
<para>
Instructs the server to start streaming a base backup.
@@ -1787,7 +1787,23 @@ The commands accepted in walsender mode are:
the waiting and the warning, leaving the client responsible for
ensuring the required log is available.
</para>
- </listitem>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><literal>MAX_RATE</literal></term>
+ <listitem>
+ <para>
+ Limit the maximum amount of data transferred to client per unit of
+ time. The expected unit is bytes per second. If
+ <literal>MAX_RATE</literal> is passed, it must be either equal to
+ zero or fall to range 32 kB through 1 GB (inclusive). If zero is
+ passed or the option is not passed at all, no restriction is imposed
+ on the transfer.
+ </para>
+ <para>
+ <literal>MAX_RATE</literal> does not affect WAL streaming.
+ </para>
+ </listitem>
</varlistentry>
</variablelist>
</para>
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index c379df5..caede77 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -189,6 +189,28 @@ PostgreSQL documentation
</varlistentry>
<varlistentry>
+ <term><option>-r <replaceable class="parameter">rate</replaceable></option></term>
+ <term><option>--max-rate=<replaceable class="parameter">rate</replaceable></option></term>
+ <listitem>
+ <para>
+ The maximum amount of data transferred from server per second.
+ The purpose is to limit impact of <application>pg_basebackup</application>
+ on running server.
+ </para>
+ <para>
+ This option always affects transfer of the data directory. Transfer of
+ WAL files is only affected if the collection method is <literal>fetch</literal>.
+ </para>
+ <para>
+ Value between <literal>32 kB</literal> and <literal>1024 MB</literal>
+ is expected. Suffixes <literal>k</literal> (kilobytes) and
+ <literal>M</literal> (megabytes) are accepted. For example:
+ <literal>10M</literal>
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term><option>-R</option></term>
<term><option>--write-recovery-conf</option></term>
<listitem>
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index ba8d173..e3ff2cf 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -30,9 +30,11 @@
#include "replication/walsender_private.h"
#include "storage/fd.h"
#include "storage/ipc.h"
+#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/elog.h"
#include "utils/ps_status.h"
+#include "utils/timestamp.h"
#include "pgtar.h"
typedef struct
@@ -42,6 +44,7 @@ typedef struct
bool fastcheckpoint;
bool nowait;
bool includewal;
+ uint32 maxrate;
} basebackup_options;
@@ -59,6 +62,7 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
static int compareWalFileNames(const void *a, const void *b);
+static void throttle(size_t increment);
/* Was the backup currently in-progress initiated in recovery mode? */
static bool backup_started_in_recovery = false;
@@ -68,6 +72,42 @@ static bool backup_started_in_recovery = false;
*/
#define TAR_SEND_SIZE 32768
+
+/*
+ * The maximum amount of data per second - bounds of the user input.
+ *
+ * If the maximum should be increased to more than 4 GB, uint64 must
+ * be introduced for the related variables. However such high values have
+ * little to do with throttling.
+ */
+#define MAX_RATE_LOWER 32768
+#define MAX_RATE_UPPER (1024 << 20)
+
+/*
+ * Transfer rate is only measured when this number of bytes has been sent.
+ * (Too frequent checks would impose too high CPU overhead.)
+ *
+ * The default value is used unless it'd result in too frequent checks.
+ */
+#define THROTTLING_SAMPLE_MIN 32768
+
+/* The maximum number of checks per second. */
+#define THROTTLING_MAX_FREQUENCY 128
+
+/* The actual value, transfer of which may cause sleep. */
+static uint32 throttling_sample;
+
+/* Amount of data already transfered but not yet throttled. */
+static int32 throttling_counter;
+
+/* The minimum time required to transfer throttling_sample bytes. */
+static int64 elapsed_min_unit;
+
+/* The last check of the transfer rate. */
+static int64 throttled_last;
+
+static Latch throttling_latch;
+
typedef struct
{
char *oid;
@@ -171,6 +211,35 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
/* Send tablespace header */
SendBackupHeader(tablespaces);
+ if (opt->maxrate > 0)
+ {
+ throttling_sample = opt->maxrate / THROTTLING_MAX_FREQUENCY;
+
+ /* Don't measure too small pieces of data. */
+ if (throttling_sample < THROTTLING_SAMPLE_MIN)
+ throttling_sample = THROTTLING_SAMPLE_MIN;
+
+ /*
+ * opt->maxrate is bytes per second. Thus the expression in
+ * brackets is microseconds per byte.
+ */
+ elapsed_min_unit = throttling_sample *
+ ((double) USECS_PER_SEC / opt->maxrate);
+
+ /* Enable throttling. */
+ throttling_counter = 0;
+
+ /* The 'real data' starts now (header was ignored). */
+ throttled_last = GetCurrentIntegerTimestamp();
+
+ InitLatch(&throttling_latch);
+ }
+ else
+ {
+ /* Disable throttling. */
+ throttling_counter = -1;
+ }
+
/* Send off our tablespaces one by one */
foreach(lc, tablespaces)
{
@@ -398,6 +467,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
(errmsg("base backup could not send data, aborting backup")));
len += cnt;
+ throttle(cnt);
+
if (len == XLogSegSize)
break;
}
@@ -468,6 +539,7 @@ parse_basebackup_options(List *options, basebackup_options *opt)
bool o_fast = false;
bool o_nowait = false;
bool o_wal = false;
+ bool o_maxrate = false;
MemSet(opt, 0, sizeof(*opt));
foreach(lopt, options)
@@ -519,6 +591,29 @@ parse_basebackup_options(List *options, basebackup_options *opt)
opt->includewal = true;
o_wal = true;
}
+ else if (strcmp(defel->defname, "maxrate") == 0)
+ {
+ long maxrate;
+
+ if (o_maxrate)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("duplicate option \"%s\"", defel->defname)));
+ maxrate = intVal(defel->arg);
+
+ opt->maxrate = (uint32) maxrate;
+ if (opt->maxrate > 0 &&
+ (opt->maxrate < MAX_RATE_LOWER || opt->maxrate > MAX_RATE_UPPER))
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE),
+ errmsg("transfer rate %u bytes per second is out of range",
+ opt->maxrate),
+ errhint("The accepted range is %u through %u kB per second",
+ MAX_RATE_LOWER >> 10, MAX_RATE_UPPER >> 10)));
+ }
+ o_maxrate = true;
+ }
else
elog(ERROR, "option \"%s\" not recognized",
defel->defname);
@@ -1019,6 +1114,7 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
(errmsg("base backup could not send data, aborting backup")));
len += cnt;
+ throttle(cnt);
if (len >= statbuf->st_size)
{
@@ -1040,10 +1136,14 @@ sendFile(char *readfilename, char *tarfilename, struct stat * statbuf,
cnt = Min(sizeof(buf), statbuf->st_size - len);
pq_putmessage('d', buf, cnt);
len += cnt;
+ throttle(cnt);
}
}
- /* Pad to 512 byte boundary, per tar format requirements */
+ /*
+ * Pad to 512 byte boundary, per tar format requirements. (This small
+ * piece of data is probably not worth throttling.)
+ */
pad = ((len + 511) & ~511) - len;
if (pad > 0)
{
@@ -1069,3 +1169,52 @@ _tarWriteHeader(const char *filename, const char *linktarget,
pq_putmessage('d', h, 512);
}
+
+static void
+throttle(size_t increment)
+{
+ int64 elapsed,
+ elapsed_min,
+ sleep;
+
+ if (throttling_counter < 0)
+ return;
+
+ throttling_counter += increment;
+ if (throttling_counter < throttling_sample)
+ return;
+
+ /* Time elapsed since the last measuring (and possible wake up). */
+ elapsed = GetCurrentIntegerTimestamp() - throttled_last;
+ /* How much should have elapsed at minimum? */
+ elapsed_min = elapsed_min_unit * (throttling_counter / throttling_sample);
+ sleep = elapsed_min - elapsed;
+ /* Only sleep if the transfer is faster than it should be. */
+ if (sleep > 0)
+ {
+ ResetLatch(&throttling_latch);
+ /*
+ * THROTTLING_SAMPLE_MIN / MAX_RATE_LOWER (in seconds) should be the
+ * longest possible time to sleep. Thus the cast to long is safe.
+ */
+ WaitLatch(&throttling_latch, WL_TIMEOUT | WL_POSTMASTER_DEATH,
+ (long) (sleep / 1000));
+ }
+ else
+ {
+
+ /*
+ * The actual transfer rate is below the limit. Negative value would
+ * distort the adjustment of throttled_last.
+ */
+ sleep = 0;
+ }
+
+ /*
+ * Only the whole multiples of throttling_sample processed. The rest will
+ * be done during the next call of this function.
+ */
+ throttling_counter %= throttling_sample;
+ /* Once the (possible) sleep ends, new period starts. */
+ throttled_last += elapsed + sleep;
+}
diff --git a/src/backend/replication/repl_gram.y b/src/backend/replication/repl_gram.y
index 8c83780..1c2c31c 100644
--- a/src/backend/replication/repl_gram.y
+++ b/src/backend/replication/repl_gram.y
@@ -78,6 +78,7 @@ Node *replication_parse_result;
%token K_PROGRESS
%token K_FAST
%token K_NOWAIT
+%token K_MAX_RATE
%token K_WAL
%token K_TIMELINE
@@ -116,7 +117,7 @@ identify_system:
;
/*
- * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT]
+ * BASE_BACKUP [LABEL '<label>'] [PROGRESS] [FAST] [WAL] [NOWAIT] [MAX_RATE %d]
*/
base_backup:
K_BASE_BACKUP base_backup_opt_list
@@ -156,6 +157,11 @@ base_backup_opt:
$$ = makeDefElem("nowait",
(Node *)makeInteger(TRUE));
}
+ | K_MAX_RATE UCONST
+ {
+ $$ = makeDefElem("maxrate",
+ (Node *)makeInteger($2));
+ }
;
/*
diff --git a/src/backend/replication/repl_scanner.l b/src/backend/replication/repl_scanner.l
index 3d930f1..b2d5e3b 100644
--- a/src/backend/replication/repl_scanner.l
+++ b/src/backend/replication/repl_scanner.l
@@ -71,6 +71,7 @@ IDENTIFY_SYSTEM { return K_IDENTIFY_SYSTEM; }
LABEL { return K_LABEL; }
NOWAIT { return K_NOWAIT; }
PROGRESS { return K_PROGRESS; }
+MAX_RATE { return K_MAX_RATE; }
WAL { return K_WAL; }
TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 6706c0c..997b48ee 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -46,6 +46,7 @@ bool streamwal = false;
bool fastcheckpoint = false;
bool writerecoveryconf = false;
int standby_message_timeout = 10 * 1000; /* 10 sec = default */
+uint32 maxrate = 0; /* No limit by default. */
/* Progress counters */
static uint64 totalsize;
@@ -76,6 +77,7 @@ static PQExpBuffer recoveryconfcontents = NULL;
static void usage(void);
static void verify_dir_is_empty_or_create(char *dirname);
static void progress_report(int tablespacenum, const char *filename);
+static uint32 parse_max_rate(char *src);
static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
@@ -111,6 +113,7 @@ usage(void)
printf(_("\nOptions controlling the output:\n"));
printf(_(" -D, --pgdata=DIRECTORY receive base backup into directory\n"));
printf(_(" -F, --format=p|t output format (plain (default), tar)\n"));
+ printf(_(" -r, --max-rate=RATE maximum transfer rate to transfer data directory\n"));
printf(_(" -R, --write-recovery-conf\n"
" write recovery.conf after backup\n"));
printf(_(" -x, --xlog include required WAL files in backup (fetch mode)\n"));
@@ -476,6 +479,91 @@ progress_report(int tablespacenum, const char *filename)
}
+static uint32
+parse_max_rate(char *src)
+{
+ int factor;
+ char *after_num;
+ int64 result;
+
+ errno = 0;
+ result = strtol(src, &after_num, 0);
+ if (src == after_num)
+ {
+ fprintf(stderr, _("%s: transfer rate \"%s\" is not a valid integer value\n"), progname, src);
+ exit(1);
+ }
+ if (errno == ERANGE)
+ {
+ fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer range\n"), progname, src);
+ exit(1);
+ }
+
+
+ /*
+ * Evaluate (optional) suffix.
+ *
+ * after_num should now be right behind the numeric value.
+ */
+ factor = 1;
+ switch (*after_num)
+ {
+ /*
+ * Only the following suffixes are allowed. It's not too useful to
+ * restrict the rate to gigabytes: such a rate will probably bring
+ * significant impact on the master anyway, so the throttling
+ * won't help much.
+ */
+ case 'M':
+ factor <<= 10;
+ case 'k':
+ factor <<= 10;
+ after_num++;
+ break;
+
+ default:
+
+ /*
+ * If there's no suffix, byte is the unit. Possible invalid letter
+ * will be detected later.
+ */
+ break;
+ }
+
+ /* The rest can only consist of white space. */
+ while (*after_num != '\0')
+ {
+ if (!isspace(*after_num))
+ {
+ fprintf(stderr, _("%s: valid units for the transfer rate are \"k\" and \"M\"\n"), progname);
+ exit(1);
+ }
+ after_num++;
+ }
+
+ if (result <= 0)
+ {
+ /*
+ * Reject obviously wrong values here. Exact check of the range to be
+ * done on server.
+ */
+ fprintf(stderr, _("%s: transfer rate must be greater than zero\n"), progname);
+ exit(1);
+ }
+ if (factor > 1)
+ {
+ result *= factor;
+ /* Check the integer range once again. */
+ if (result != (uint64) ((uint32) result))
+ {
+ fprintf(stderr, _("%s: transfer rate \"%s\" exceeds integer range\n"), progname, src);
+ exit(1);
+ }
+ }
+ return (uint32) result;
+}
+
+
/*
* Write a piece of tar data
*/
@@ -1310,6 +1398,7 @@ BaseBackup(void)
uint32 starttli;
char current_path[MAXPGPATH];
char escaped_label[MAXPGPATH];
+ char maxrate_clause[MAXPGPATH];
int i;
char xlogstart[64];
char xlogend[64];
@@ -1382,13 +1471,18 @@ BaseBackup(void)
* Start the actual backup
*/
PQescapeStringConn(conn, escaped_label, label, sizeof(escaped_label), &i);
+ if (maxrate > 0)
+ snprintf(maxrate_clause, sizeof(maxrate_clause), "MAX_RATE %u", maxrate);
+ else
+ maxrate_clause[0] = '\0';
snprintf(current_path, sizeof(current_path),
- "BASE_BACKUP LABEL '%s' %s %s %s %s",
+ "BASE_BACKUP LABEL '%s' %s %s %s %s %s",
escaped_label,
showprogress ? "PROGRESS" : "",
includewal && !streamwal ? "WAL" : "",
fastcheckpoint ? "FAST" : "",
- includewal ? "NOWAIT" : "");
+ includewal ? "NOWAIT" : "",
+ maxrate_clause);
if (PQsendQuery(conn, current_path) == 0)
{
@@ -1657,6 +1751,7 @@ main(int argc, char **argv)
{"pgdata", required_argument, NULL, 'D'},
{"format", required_argument, NULL, 'F'},
{"checkpoint", required_argument, NULL, 'c'},
+ {"max-rate", required_argument, NULL, 'r'},
{"write-recovery-conf", no_argument, NULL, 'R'},
{"xlog", no_argument, NULL, 'x'},
{"xlog-method", required_argument, NULL, 'X'},
@@ -1697,7 +1792,7 @@ main(int argc, char **argv)
}
}
- while ((c = getopt_long(argc, argv, "D:F:RxX:l:zZ:d:c:h:p:U:s:wWvP",
+ while ((c = getopt_long(argc, argv, "D:F:r:RxX:l:zZ:d:c:h:p:U:s:wWvP",
long_options, &option_index)) != -1)
{
switch (c)
@@ -1718,6 +1813,9 @@ main(int argc, char **argv)
exit(1);
}
break;
+ case 'r':
+ maxrate = parse_max_rate(optarg);
+ break;
case 'R':
writerecoveryconf = true;
break;
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers