On 14/01/15 17:22, Gabriele Bartolini wrote:
> 
> My opinion, Marco, is that for version 5 of this patch, you:
> 
> 1) update the information on the wiki (it is outdated - I know you have
> been busy with LSN map optimisation)

Done.

> 2) modify pg_basebackup in order to accept a directory (or tar file) and
> automatically detect the LSN from the backup profile

New version of patch attached. The -I parameter now requires a backup
profile from a previous backup. I've added a sanity check that forbid
incremental file level backups if the base timeline is different from
the current one.

> 3) add the documentation regarding the backup profile and pg_basebackup
> 

Next on my TODO list.

> Once we have all of this, we can continue trying the patch. Some
> unexplored paths are:
> 
> * tablespace usage

I've improved my pg_restorebackup python PoC. It now supports tablespaces.

> * tar format
> * performance impact (in both "read-only" and heavily updated contexts)

From the server point of view, the current code generates a load similar
to normal backup. It only adds an initial scan of any data file to
decide whether it has to send it. One it found a single newer page it
immediately stop scanning and start sending the file. The IO impact
should not be that big due to the filesystem cache, but I agree with you
that it has to be measured.

Regards,
Marco

-- 
Marco Nenciarini - 2ndQuadrant Italy
PostgreSQL Training, Services and Support
marco.nenciar...@2ndquadrant.it | www.2ndQuadrant.it
From f7cf8b9dd7d32f64a30dafaeeaeb56cbcd2eafff Mon Sep 17 00:00:00 2001
From: Marco Nenciarini <marco.nenciar...@2ndquadrant.it>
Date: Tue, 14 Oct 2014 14:31:28 +0100
Subject: [PATCH] File-based incremental backup v5

Add backup profile to pg_basebackup
INCREMENTAL option implementaion
---
 src/backend/access/transam/xlog.c      |   7 +-
 src/backend/access/transam/xlogfuncs.c |   2 +-
 src/backend/replication/basebackup.c   | 335 +++++++++++++++++++++++++++++++--
 src/backend/replication/repl_gram.y    |   6 +
 src/backend/replication/repl_scanner.l |   1 +
 src/bin/pg_basebackup/pg_basebackup.c  | 147 +++++++++++++--
 src/include/access/xlog.h              |   3 +-
 src/include/replication/basebackup.h   |   4 +
 8 files changed, 473 insertions(+), 32 deletions(-)

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 629a457..1e50625 100644
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
*************** XLogFileNameP(TimeLineID tli, XLogSegNo 
*** 9249,9255 ****
   * permissions of the calling user!
   */
  XLogRecPtr
! do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                                   char **labelfile)
  {
        bool            exclusive = (labelfile == NULL);
--- 9249,9256 ----
   * permissions of the calling user!
   */
  XLogRecPtr
! do_pg_start_backup(const char *backupidstr, bool fast,
!                                  XLogRecPtr incremental_startpoint, 
TimeLineID *starttli_p,
                                   char **labelfile)
  {
        bool            exclusive = (labelfile == NULL);
*************** do_pg_start_backup(const char *backupids
*** 9468,9473 ****
--- 9469,9478 ----
                         (uint32) (startpoint >> 32), (uint32) startpoint, 
xlogfilename);
                appendStringInfo(&labelfbuf, "CHECKPOINT LOCATION: %X/%X\n",
                                         (uint32) (checkpointloc >> 32), 
(uint32) checkpointloc);
+               if (incremental_startpoint > 0)
+                       appendStringInfo(&labelfbuf, "INCREMENTAL FROM 
LOCATION: %X/%X\n",
+                                                        (uint32) 
(incremental_startpoint >> 32),
+                                                        (uint32) 
incremental_startpoint);
                appendStringInfo(&labelfbuf, "BACKUP METHOD: %s\n",
                                                 exclusive ? "pg_start_backup" 
: "streamed");
                appendStringInfo(&labelfbuf, "BACKUP FROM: %s\n",
diff --git a/src/backend/access/transam/xlogfuncs.c 
b/src/backend/access/transam/xlogfuncs.c
index 2179bf7..ace84d8 100644
*** a/src/backend/access/transam/xlogfuncs.c
--- b/src/backend/access/transam/xlogfuncs.c
*************** pg_start_backup(PG_FUNCTION_ARGS)
*** 59,65 ****
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                   errmsg("must be superuser or replication role to run a 
backup")));
  
!       startpoint = do_pg_start_backup(backupidstr, fast, NULL, NULL);
  
        PG_RETURN_LSN(startpoint);
  }
--- 59,65 ----
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                   errmsg("must be superuser or replication role to run a 
backup")));
  
!       startpoint = do_pg_start_backup(backupidstr, fast, 0, NULL, NULL);
  
        PG_RETURN_LSN(startpoint);
  }
diff --git a/src/backend/replication/basebackup.c 
b/src/backend/replication/basebackup.c
index 07030a2..05b19c5 100644
*** a/src/backend/replication/basebackup.c
--- b/src/backend/replication/basebackup.c
***************
*** 30,40 ****
--- 30,42 ----
  #include "replication/basebackup.h"
  #include "replication/walsender.h"
  #include "replication/walsender_private.h"
+ #include "storage/bufpage.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
  #include "utils/builtins.h"
  #include "utils/elog.h"
  #include "utils/ps_status.h"
+ #include "utils/pg_lsn.h"
  #include "utils/timestamp.h"
  
  
*************** typedef struct
*** 46,56 ****
        bool            nowait;
        bool            includewal;
        uint32          maxrate;
  } basebackup_options;
  
  
! static int64 sendDir(char *path, int basepathlen, bool sizeonly, List 
*tablespaces);
! static int64 sendTablespace(char *path, bool sizeonly);
  static bool sendFile(char *readfilename, char *tarfilename,
                 struct stat * statbuf, bool missing_ok);
  static void sendFileWithContent(const char *filename, const char *content);
--- 48,62 ----
        bool            nowait;
        bool            includewal;
        uint32          maxrate;
+       XLogRecPtr      incremental_startpoint;
  } basebackup_options;
  
  
! static int64 sendDir(char *path, int basepathlen, bool sizeonly,
!                                        List *tablespaces, bool has_relfiles,
!                                        XLogRecPtr incremental_startpoint);
! static int64 sendTablespace(char *path, bool sizeonly,
!                               XLogRecPtr incremental_startpoint);
  static bool sendFile(char *readfilename, char *tarfilename,
                 struct stat * statbuf, bool missing_ok);
  static void sendFileWithContent(const char *filename, const char *content);
*************** static void parse_basebackup_options(Lis
*** 64,69 ****
--- 70,81 ----
  static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
  static int    compareWalFileNames(const void *a, const void *b);
  static void throttle(size_t increment);
+ static bool relnodeIsNewerThanLSN(char *filename, struct stat * statbuf,
+                               XLogRecPtr *filemaxlsn, XLogRecPtr 
thresholdlsn);
+ static void writeBackupProfileLine(const char *filename, struct stat * 
statbuf,
+                                                                  bool 
has_maxlsn, XLogRecPtr filemaxlsn, bool sent);
+ static void sendBackupProfile(const char *labelfile);
+ static bool validateRelfilenodeName(char *name);
  
  /* Was the backup currently in-progress initiated in recovery mode? */
  static bool backup_started_in_recovery = false;
*************** static int64 elapsed_min_unit;
*** 93,98 ****
--- 105,116 ----
  /* The last check of the transfer rate. */
  static int64 throttled_last;
  
+ /* Temporary file containing the backup profile */
+ static File backup_profile_fd = 0;
+ 
+ /* Tablespace being currently sent. Used in backup profile generation */
+ static char *current_tablespace = NULL;
+ 
  typedef struct
  {
        char       *oid;
*************** perform_base_backup(basebackup_options *
*** 132,138 ****
  
        backup_started_in_recovery = RecoveryInProgress();
  
!       startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, 
&starttli,
                                                                  &labelfile);
        /*
         * Once do_pg_start_backup has been called, ensure that any failure 
causes
--- 150,160 ----
  
        backup_started_in_recovery = RecoveryInProgress();
  
!       /* Open a temporary file to hold the profile content. */
!       backup_profile_fd = OpenTemporaryFile(false);
! 
!       startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint,
!                                                                 
opt->incremental_startpoint, &starttli,
                                                                  &labelfile);
        /*
         * Once do_pg_start_backup has been called, ensure that any failure 
causes
*************** perform_base_backup(basebackup_options *
*** 208,214 ****
                        ti->oid = pstrdup(de->d_name);
                        ti->path = pstrdup(linkpath);
                        ti->rpath = relpath ? pstrdup(relpath) : NULL;
!                       ti->size = opt->progress ? sendTablespace(fullpath, 
true) : -1;
                        tablespaces = lappend(tablespaces, ti);
  #else
  
--- 230,237 ----
                        ti->oid = pstrdup(de->d_name);
                        ti->path = pstrdup(linkpath);
                        ti->rpath = relpath ? pstrdup(relpath) : NULL;
!                       ti->size = opt->progress ? sendTablespace(fullpath, 
true,
!                                                                               
        opt->incremental_startpoint) : -1;
                        tablespaces = lappend(tablespaces, ti);
  #else
  
*************** perform_base_backup(basebackup_options *
*** 225,231 ****
  
                /* Add a node for the base directory at the end */
                ti = palloc0(sizeof(tablespaceinfo));
!               ti->size = opt->progress ? sendDir(".", 1, true, tablespaces) : 
-1;
                tablespaces = lappend(tablespaces, ti);
  
                /* Send tablespace header */
--- 248,255 ----
  
                /* Add a node for the base directory at the end */
                ti = palloc0(sizeof(tablespaceinfo));
!               ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, 
false,
!                                                                               
   opt->incremental_startpoint) : -1;
                tablespaces = lappend(tablespaces, ti);
  
                /* Send tablespace header */
*************** perform_base_backup(basebackup_options *
*** 267,272 ****
--- 291,302 ----
                        pq_sendint(&buf, 0, 2);         /* natts */
                        pq_endmessage(&buf);
  
+                       /*
+                        * Save the current tablespace, used in 
writeBackupProfileLine
+                        * function
+                        */
+                       current_tablespace = ti->oid;
+ 
                        if (ti->path == NULL)
                        {
                                struct stat statbuf;
*************** perform_base_backup(basebackup_options *
*** 275,281 ****
                                sendFileWithContent(BACKUP_LABEL_FILE, 
labelfile);
  
                                /* ... then the bulk of the files ... */
!                               sendDir(".", 1, false, tablespaces);
  
                                /* ... and pg_control after everything else. */
                                if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
--- 305,311 ----
                                sendFileWithContent(BACKUP_LABEL_FILE, 
labelfile);
  
                                /* ... then the bulk of the files ... */
!                               sendDir(".", 1, false, tablespaces, false, 
opt->incremental_startpoint);
  
                                /* ... and pg_control after everything else. */
                                if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
*************** perform_base_backup(basebackup_options *
*** 284,292 ****
                                                         errmsg("could not stat 
control file \"%s\": %m",
                                                                        
XLOG_CONTROL_FILE)));
                                sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, 
&statbuf, false);
                        }
                        else
!                               sendTablespace(ti->path, false);
  
                        /*
                         * If we're including WAL, and this is the main data 
directory we
--- 314,323 ----
                                                         errmsg("could not stat 
control file \"%s\": %m",
                                                                        
XLOG_CONTROL_FILE)));
                                sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, 
&statbuf, false);
+                               writeBackupProfileLine(XLOG_CONTROL_FILE, 
&statbuf, false, 0, true);
                        }
                        else
!                               sendTablespace(ti->path, false, 
opt->incremental_startpoint);
  
                        /*
                         * If we're including WAL, and this is the main data 
directory we
*************** perform_base_backup(basebackup_options *
*** 501,507 ****
  
                        FreeFile(fp);
  
!                       /*
                         * Mark file as archived, otherwise files can get 
archived again
                         * after promotion of a new node. This is in line with
                         * walreceiver.c always doing a XLogArchiveForceDone() 
after a
--- 532,541 ----
  
                        FreeFile(fp);
  
!                       /* Add the WAL file to backup profile */
!                       writeBackupProfileLine(pathbuf, &statbuf, false, 0, 
true);
! 
!                   /*
                         * Mark file as archived, otherwise files can get 
archived again
                         * after promotion of a new node. This is in line with
                         * walreceiver.c always doing a XLogArchiveForceDone() 
after a
*************** perform_base_backup(basebackup_options *
*** 533,538 ****
--- 567,575 ----
  
                        sendFile(pathbuf, pathbuf, &statbuf, false);
  
+                       /* Add the WAL file to backup profile */
+                       writeBackupProfileLine(pathbuf, &statbuf, false, 0, 
true);
+ 
                        /* unconditionally mark file as archived */
                        StatusFilePath(pathbuf, fname, ".done");
                        sendFileWithContent(pathbuf, "");
*************** perform_base_backup(basebackup_options *
*** 542,547 ****
--- 579,587 ----
                pq_putemptymessage('c');
        }
        SendXlogRecPtrResult(endptr, endtli);
+ 
+       /* Send the profile file. */
+       sendBackupProfile(labelfile);
  }
  
  /*
*************** parse_basebackup_options(List *options, 
*** 570,575 ****
--- 610,616 ----
        bool            o_nowait = false;
        bool            o_wal = false;
        bool            o_maxrate = false;
+       bool            o_incremental = false;
  
        MemSet(opt, 0, sizeof(*opt));
        foreach(lopt, options)
*************** parse_basebackup_options(List *options, 
*** 640,645 ****
--- 681,698 ----
                        opt->maxrate = (uint32) maxrate;
                        o_maxrate = true;
                }
+               else if (strcmp(defel->defname, "incremental") == 0)
+               {
+                       if (o_incremental)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("duplicate option 
\"%s\"", defel->defname)));
+ 
+                       opt->incremental_startpoint = DatumGetLSN(
+                               DirectFunctionCall1(pg_lsn_in,
+                                                                       
CStringGetDatum(strVal(defel->arg))));
+                       o_incremental = true;
+               }
                else
                        elog(ERROR, "option \"%s\" not recognized",
                                 defel->defname);
*************** sendFileWithContent(const char *filename
*** 859,864 ****
--- 912,920 ----
                MemSet(buf, 0, pad);
                pq_putmessage('d', buf, pad);
        }
+ 
+       /* Write a backup profile entry for this file. */
+       writeBackupProfileLine(filename, &statbuf, false, 0, true);
  }
  
  /*
*************** sendFileWithContent(const char *filename
*** 869,875 ****
   * Only used to send auxiliary tablespaces, not PGDATA.
   */
  static int64
! sendTablespace(char *path, bool sizeonly)
  {
        int64           size;
        char            pathbuf[MAXPGPATH];
--- 925,931 ----
   * Only used to send auxiliary tablespaces, not PGDATA.
   */
  static int64
! sendTablespace(char *path, bool sizeonly, XLogRecPtr incremental_startpoint)
  {
        int64           size;
        char            pathbuf[MAXPGPATH];
*************** sendTablespace(char *path, bool sizeonly
*** 902,908 ****
        size = 512;                                     /* Size of the header 
just added */
  
        /* Send all the files in the tablespace version directory */
!       size += sendDir(pathbuf, strlen(path), sizeonly, NIL);
  
        return size;
  }
--- 958,964 ----
        size = 512;                                     /* Size of the header 
just added */
  
        /* Send all the files in the tablespace version directory */
!       size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true, 
incremental_startpoint);
  
        return size;
  }
*************** sendTablespace(char *path, bool sizeonly
*** 914,922 ****
   *
   * Omit any directory in the tablespaces list, to avoid backing up
   * tablespaces twice when they were created inside PGDATA.
   */
  static int64
! sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
  {
        DIR                *dir;
        struct dirent *de;
--- 970,982 ----
   *
   * Omit any directory in the tablespaces list, to avoid backing up
   * tablespaces twice when they were created inside PGDATA.
+  *
+  * If 'has_relfiles' is set, this directory will be checked to identify
+  * relnode files and compute their maxLSN.
   */
  static int64
! sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces,
!               bool has_relfiles, XLogRecPtr incremental_startpoint)
  {
        DIR                *dir;
        struct dirent *de;
*************** sendDir(char *path, int basepathlen, boo
*** 1124,1138 ****
                                }
                        }
                        if (!skip_this_dir)
!                               size += sendDir(pathbuf, basepathlen, sizeonly, 
tablespaces);
                }
                else if (S_ISREG(statbuf.st_mode))
                {
                        bool            sent = false;
  
                        if (!sizeonly)
!                               sent = sendFile(pathbuf, pathbuf + basepathlen 
+ 1, &statbuf,
!                                                               true);
  
                        if (sent || sizeonly)
                        {
--- 1184,1235 ----
                                }
                        }
                        if (!skip_this_dir)
!                       {
!                               bool    subdir_has_relfiles;
! 
!                               /*
!                                * Whithin PGDATA relnode files are contained 
only in "global"
!                                * and "base" directory
!                                */
!                               subdir_has_relfiles = has_relfiles
!                                       || strcmp(pathbuf, "./global") == 0
!                                       || strcmp(pathbuf, "./base") == 0;
! 
!                               size += sendDir(pathbuf, basepathlen, sizeonly, 
tablespaces,
!                                                               
subdir_has_relfiles, incremental_startpoint);
!                       }
                }
                else if (S_ISREG(statbuf.st_mode))
                {
                        bool            sent = false;
  
                        if (!sizeonly)
!                       {
!                               bool            is_relfile;
!                               XLogRecPtr      filemaxlsn = 0;
! 
!                               /*
!                                * If the current directory can have relnode 
files, check the file
!                                * name to see if it is one of them.
!                                */
!                               is_relfile = has_relfiles && 
validateRelfilenodeName(de->d_name);
! 
!                               if (!is_relfile
!                                       || incremental_startpoint == 0
!                                       || relnodeIsNewerThanLSN(pathbuf, 
&statbuf, &filemaxlsn,
!                                                                               
          incremental_startpoint))
!                               {
!                                       sent = sendFile(pathbuf, pathbuf + 
basepathlen + 1,
!                                                                       
&statbuf, true);
!                                       /* Write a backup profile entry for the 
sent file. */
!                                       writeBackupProfileLine(pathbuf + 
basepathlen + 1, &statbuf,
!                                                                               
   false, 0, sent);
!                               }
!                               else
!                                       /* Write a backup profile entry for the 
skipped file. */
!                                       writeBackupProfileLine(pathbuf + 
basepathlen + 1, &statbuf,
!                                                                               
   true, filemaxlsn, sent);
!                       }
  
                        if (sent || sizeonly)
                        {
*************** throttle(size_t increment)
*** 1327,1329 ****
--- 1424,1636 ----
                /* Sleep was necessary but might have been interrupted. */
                throttled_last = GetCurrentIntegerTimestamp();
  }
+ 
+ /*
+  * Search in a relnode file for a page with a LSN greater than the threshold.
+  * If all the blocks in the file are older than the threshold the file can
+  * be safely skipped during an incremental backup.
+  */
+ static bool
+ relnodeIsNewerThanLSN(char *filename, struct stat * statbuf,
+               XLogRecPtr *filemaxlsn, XLogRecPtr thresholdlsn)
+ {
+       FILE       *fp;
+       char            buf[BLCKSZ];
+       size_t          cnt;
+       pgoff_t         len = 0;
+       XLogRecPtr      pagelsn;
+ 
+       *filemaxlsn = 0;
+ 
+       fp = AllocateFile(filename, "rb");
+       if (fp == NULL)
+       {
+               if (errno == ENOENT)
+                       return true;
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not open file \"%s\": %m", 
filename)));
+       }
+ 
+       while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), 
fp)) > 0)
+       {
+               pagelsn = PageGetLSN(buf);
+ 
+               /* Keep the max LSN found */
+               if (*filemaxlsn < pagelsn)
+                       *filemaxlsn = pagelsn;
+ 
+               /*
+                *  If a page with a LSN newer than the threshold stop scanning
+                *  and set the filemaxlsn value to 0 as it is only partial.
+                */
+               if (thresholdlsn <= pagelsn)
+               {
+                       *filemaxlsn = 0;
+                       return true;
+               }
+ 
+               if (len >= statbuf->st_size)
+               {
+                       /*
+                        * Reached end of file. The file could be longer, if it 
was
+                        * extended while we were sending it, but for a base 
backup we can
+                        * ignore such extended data. It will be restored from 
WAL.
+                        */
+                       break;
+               }
+       }
+ 
+       FreeFile(fp);
+       return false;
+ }
+ 
+ /*
+  * Write an entry in file list section of backup profile.
+  */
+ static void
+ writeBackupProfileLine(const char *filename, struct stat * statbuf,
+                                          bool has_maxlsn, XLogRecPtr 
filemaxlsn, bool sent)
+ {
+       /*
+        * tablespace oid (10) + max LSN (17) + mtime (10) + size (19) +
+        * path (MAXPGPATH) + separators (4) + trailing \0 = 65
+        */
+       char    buf[MAXPGPATH + 65];
+       char    maxlsn[17];
+       int             rowlen;
+ 
+       Assert(backup_profile_fd > 0);
+ 
+       /* Prepare maxlsn */
+       if (has_maxlsn)
+       {
+               snprintf(maxlsn, sizeof(maxlsn), "%X/%X",
+                                (uint32) (filemaxlsn >> 32), (uint32) 
filemaxlsn);
+       }
+       else
+       {
+               strlcpy(maxlsn, "\\N", sizeof(maxlsn));
+       }
+ 
+       rowlen = snprintf(buf, sizeof(buf), "%s\t%s\t%s\t%u\t%lld\t%s\n",
+                                         current_tablespace ? 
current_tablespace : "\\N",
+                                         maxlsn,
+                                         sent ? "t" : "f",
+                                         (uint32) statbuf->st_mtime,
+                                         statbuf->st_size,
+                                         filename);
+       FileWrite(backup_profile_fd, buf, rowlen);
+ }
+ 
+ /*
+  * Send the backup profile. It is wrapped in a tar CopyOutResponse containing
+  * a tar stream with only one file.
+  */
+ static void
+ sendBackupProfile(const char *labelfile)
+ {
+       StringInfoData msgbuf;
+       struct stat statbuf;
+       char            buf[TAR_SEND_SIZE];
+       size_t          cnt;
+       pgoff_t         len = 0;
+       size_t          pad;
+       char *backup_profile = FilePathName(backup_profile_fd);
+ 
+       /* Send CopyOutResponse message */
+       pq_beginmessage(&msgbuf, 'H');
+       pq_sendbyte(&msgbuf, 0);                /* overall format */
+       pq_sendint(&msgbuf, 0, 2);              /* natts */
+       pq_endmessage(&msgbuf);
+ 
+       if (lstat(backup_profile, &statbuf) != 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not stat backup_profile file 
\"%s\": %m",
+                                               backup_profile)));
+ 
+       /* Set the file position to the beginning. */
+       FileSeek(backup_profile_fd, 0, SEEK_SET);
+ 
+       /*
+        * Fill the buffer with content of backup profile header section. Being 
it
+        * the concatenation of two separator and the backup label, it should be
+        * shorter of TAR_SEND_SIZE.
+        */
+       cnt = snprintf(buf, sizeof(buf), "%s\n%s%s\n",
+                                  BACKUP_PROFILE_HEADER,
+                                  labelfile,
+                                  BACKUP_PROFILE_SEPARATOR);
+ 
+       /* Add size of backup label and separators */
+       statbuf.st_size += cnt;
+ 
+       _tarWriteHeader(BACKUP_PROFILE_FILE, NULL, &statbuf);
+ 
+       /* Send backup profile header */
+       if (pq_putmessage('d', buf, cnt))
+               ereport(ERROR,
+                               (errmsg("base backup could not send data, 
aborting backup")));
+ 
+       len += cnt;
+       throttle(cnt);
+ 
+       while ((cnt = FileRead(backup_profile_fd, buf, sizeof(buf))) > 0)
+       {
+               /* Send the chunk as a CopyData message */
+               if (pq_putmessage('d', buf, cnt))
+                       ereport(ERROR,
+                                       (errmsg("base backup could not send 
data, aborting backup")));
+ 
+               len += cnt;
+               throttle(cnt);
+ 
+       }
+ 
+       /*
+        * Pad to 512 byte boundary, per tar format requirements. (This small
+        * piece of data is probably not worth throttling.)
+        */
+       pad = ((len + 511) & ~511) - len;
+       if (pad > 0)
+       {
+               MemSet(buf, 0, pad);
+               pq_putmessage('d', buf, pad);
+       }
+ 
+       pq_putemptymessage('c');        /* CopyDone */
+ }
+ 
+ /*
+  * relfilenode name validation.
+  *
+  * Format with_ext == true    [0-9]+[ \w | _vm | _fsm | _init ][\.][0-9]*
+  *              with_ext == false [0-9]+
+  */
+ static bool
+ validateRelfilenodeName(char *name)
+ {
+       int                     pos = 0;
+ 
+       while ((name[pos] >= '0') && (name[pos] <= '9'))
+               pos++;
+ 
+       if (name[pos] == '_')
+       {
+               pos++;
+               while ((name[pos] >= 'a') && (name[pos] <= 'z'))
+                       pos++;
+       }
+       if (name[pos] == '.')
+       {
+               pos++;
+               while ((name[pos] >= '0') && (name[pos] <= '9'))
+                       pos++;
+       }
+ 
+       if (name[pos] == 0)
+               return true;
+ 
+       return false;
+ }
diff --git a/src/backend/replication/repl_gram.y 
b/src/backend/replication/repl_gram.y
index 2a41eb1..684cf4d 100644
*** a/src/backend/replication/repl_gram.y
--- b/src/backend/replication/repl_gram.y
*************** Node *replication_parse_result;
*** 75,80 ****
--- 75,81 ----
  %token K_PHYSICAL
  %token K_LOGICAL
  %token K_SLOT
+ %token K_INCREMENTAL
  
  %type <node>  command
  %type <node>  base_backup start_replication start_logical_replication 
create_replication_slot drop_replication_slot identify_system timeline_history
*************** base_backup_opt:
*** 168,173 ****
--- 169,179 ----
                                  $$ = makeDefElem("max_rate",
                                                                   (Node 
*)makeInteger($2));
                                }
+                       | K_INCREMENTAL SCONST
+                               {
+                                 $$ = makeDefElem("incremental",
+                                                                  (Node 
*)makeString($2));
+                               }
                        ;
  
  create_replication_slot:
diff --git a/src/backend/replication/repl_scanner.l 
b/src/backend/replication/repl_scanner.l
index 449c127..a6d0dd8 100644
*** a/src/backend/replication/repl_scanner.l
--- b/src/backend/replication/repl_scanner.l
*************** TIMELINE_HISTORY        { return K_TIMELINE_HIS
*** 96,101 ****
--- 96,102 ----
  PHYSICAL                      { return K_PHYSICAL; }
  LOGICAL                               { return K_LOGICAL; }
  SLOT                          { return K_SLOT; }
+ INCREMENTAL                   { return K_INCREMENTAL; }
  
  ","                           { return ','; }
  ";"                           { return ';'; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c 
b/src/bin/pg_basebackup/pg_basebackup.c
index fbf7106..892472d 100644
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
*************** static bool writerecoveryconf = false;
*** 67,72 ****
--- 67,74 ----
  static int    standby_message_timeout = 10 * 1000;            /* 10 sec = 
default */
  static pg_time_t last_progress_report = 0;
  static int32 maxrate = 0;             /* no limit by default */
+ static XLogRecPtr incremental_startpoint = 0;
+ static TimeLineID incremental_timeline = 0;
  
  
  /* Progress counters */
*************** static void usage(void);
*** 99,107 ****
  static void disconnect_and_exit(int code);
  static void verify_dir_is_empty_or_create(char *dirname);
  static void progress_report(int tablespacenum, const char *filename, bool 
force);
  
  static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
! static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
  static void GenerateRecoveryConf(PGconn *conn);
  static void WriteRecoveryConf(void);
  static void BaseBackup(void);
--- 101,111 ----
  static void disconnect_and_exit(int code);
  static void verify_dir_is_empty_or_create(char *dirname);
  static void progress_report(int tablespacenum, const char *filename, bool 
force);
+ static void read_backup_profile_header(const char *profile_path);
  
  static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
! static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum,
!                                                                       const 
char *dest_path);
  static void GenerateRecoveryConf(PGconn *conn);
  static void WriteRecoveryConf(void);
  static void BaseBackup(void);
*************** usage(void)
*** 232,237 ****
--- 236,243 ----
        printf(_("\nOptions controlling the output:\n"));
        printf(_("  -D, --pgdata=DIRECTORY receive base backup into 
directory\n"));
        printf(_("  -F, --format=p|t       output format (plain (default), 
tar)\n"));
+       printf(_("  -I, --incremental=PROFILE\n"
+                        "                         enable incremental from 
given backup profile\n"));
        printf(_("  -r, --max-rate=RATE    maximum transfer rate to transfer 
data directory\n"
                         "                         (in kB/s, or use suffix 
\"k\" or \"M\")\n"));
        printf(_("  -R, --write-recovery-conf\n"
*************** parse_max_rate(char *src)
*** 717,722 ****
--- 723,778 ----
        return (int32) result;
  }
  
+ 
+ /*
+  * Read incremental_startpoint and incremental_timeline
+  * from a backup profile.
+  */
+ static void
+ read_backup_profile_header(const char *profile_path)
+ {
+       FILE       *lfp;
+       char            ch;
+       uint32          hi,
+                               lo;
+ 
+       /*
+        * See if label file is present
+        */
+       lfp = fopen(profile_path, "r");
+       if (!lfp)
+       {
+               fprintf(stderr, _("%s: could not read file \"%s\": %s\n"),
+                               progname, profile_path, strerror(errno));
+               exit(1);
+       }
+ 
+       /* Consume the profile header, don't fail if the header is absent */
+       fscanf(lfp, "POSTGRESQL BACKUP PROFILE 1\n");
+ 
+       /*
+        * Read and parse the START WAL LOCATION (this code
+        * is pretty crude, but we are not expecting any variability in the file
+        * format).
+        */
+       if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%*16s)%c",
+                          &hi, &lo, &incremental_timeline, &ch) != 4 || ch != 
'\n')
+       {
+               fprintf(stderr, _("%s: invalid data in file \"%s\"\n"),
+                               progname, profile_path);
+               exit(1);
+       }
+       incremental_startpoint = ((uint64) hi) << 32 | lo;
+ 
+       if (ferror(lfp) || fclose(lfp))
+       {
+               fprintf(stderr, _("%s: could not read file \"%s\": %s\n"),
+                               progname, profile_path, strerror(errno));
+               exit(1);
+       }
+ }
+ 
+ 
  /*
   * Write a piece of tar data
   */
*************** get_tablespace_mapping(const char *dir)
*** 1128,1136 ****
   * If the data is for the main data directory, it will be restored in the
   * specified directory. If it's for another tablespace, it will be restored
   * in the original or mapped directory.
   */
  static void
! ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
  {
        char            current_path[MAXPGPATH];
        char            filename[MAXPGPATH];
--- 1184,1199 ----
   * If the data is for the main data directory, it will be restored in the
   * specified directory. If it's for another tablespace, it will be restored
   * in the original or mapped directory.
+  *
+  * If 'res' is NULL, the destination directory is taken from the
+  * 'dest_path' parameter.
+  *
+  * When 'dest_path' is specified, progresses are not displayed because the
+  * content it is not in any tablespace.
   */
  static void
! ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum,
!                                               const char *dest_path)
  {
        char            current_path[MAXPGPATH];
        char            filename[MAXPGPATH];
*************** ReceiveAndUnpackTarFile(PGconn *conn, PG
*** 1141,1153 ****
        char       *copybuf = NULL;
        FILE       *file = NULL;
  
!       basetablespace = PQgetisnull(res, rownum, 0);
!       if (basetablespace)
!               strlcpy(current_path, basedir, sizeof(current_path));
        else
!               strlcpy(current_path,
!                               get_tablespace_mapping(PQgetvalue(res, rownum, 
1)),
!                               sizeof(current_path));
  
        /*
         * Get the COPY data
--- 1204,1231 ----
        char       *copybuf = NULL;
        FILE       *file = NULL;
  
!       /* 'res' and 'dest_path' are mutually exclusive */
!       assert(!res != !dest_path);
! 
!       /*
!        * If 'res' is NULL, the destination directory is taken from the
!        * 'dest_path' parameter.
!        */
!       if (res)
!       {
!               basetablespace = PQgetisnull(res, rownum, 0);
!               if (basetablespace)
!                       strlcpy(current_path, basedir, sizeof(current_path));
!               else
!                       strlcpy(current_path,
!                                       get_tablespace_mapping(PQgetvalue(res, 
rownum, 1)),
!                                       sizeof(current_path));
!       }
        else
!       {
!               basetablespace = false;
!               strlcpy(current_path, dest_path, sizeof(current_path));
!       }
  
        /*
         * Get the COPY data
*************** ReceiveAndUnpackTarFile(PGconn *conn, PG
*** 1355,1361 ****
                                disconnect_and_exit(1);
                        }
                        totaldone += r;
!                       progress_report(rownum, filename, false);
  
                        current_len_left -= r;
                        if (current_len_left == 0 && current_padding == 0)
--- 1433,1441 ----
                                disconnect_and_exit(1);
                        }
                        totaldone += r;
!                       /* report progress unless a custom destination is used 
*/
!                       if (!dest_path)
!                               progress_report(rownum, filename, false);
  
                        current_len_left -= r;
                        if (current_len_left == 0 && current_padding == 0)
*************** ReceiveAndUnpackTarFile(PGconn *conn, PG
*** 1371,1377 ****
                        }
                }                                               /* continuing 
data in existing file */
        }                                                       /* loop over 
all data blocks */
!       progress_report(rownum, filename, true);
  
        if (file != NULL)
        {
--- 1451,1459 ----
                        }
                }                                               /* continuing 
data in existing file */
        }                                                       /* loop over 
all data blocks */
!       /* report progress unless a custom destination is used */
!       if (!dest_path)
!               progress_report(rownum, filename, true);
  
        if (file != NULL)
        {
*************** BaseBackup(void)
*** 1587,1592 ****
--- 1669,1675 ----
        char       *basebkp;
        char            escaped_label[MAXPGPATH];
        char       *maxrate_clause = NULL;
+       char       *incremental_clause = NULL;
        int                     i;
        char            xlogstart[64];
        char            xlogend[64];
*************** BaseBackup(void)
*** 1648,1661 ****
        if (maxrate > 0)
                maxrate_clause = psprintf("MAX_RATE %u", maxrate);
  
        basebkp =
!               psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s",
                                 escaped_label,
                                 showprogress ? "PROGRESS" : "",
                                 includewal && !streamwal ? "WAL" : "",
                                 fastcheckpoint ? "FAST" : "",
                                 includewal ? "NOWAIT" : "",
!                                maxrate_clause ? maxrate_clause : "");
  
        if (PQsendQuery(conn, basebkp) == 0)
        {
--- 1731,1770 ----
        if (maxrate > 0)
                maxrate_clause = psprintf("MAX_RATE %u", maxrate);
  
+       if (incremental_startpoint > 0)
+       {
+               incremental_clause = psprintf("INCREMENTAL '%X/%X'",
+                                                                         
(uint32) (incremental_startpoint >> 32),
+                                                                         
(uint32) incremental_startpoint);
+ 
+               /*
+                * Sanity check: if from a different timeline abort the backup.
+                */
+               if (latesttli != incremental_timeline)
+               {
+                       fprintf(stderr,
+                                       _("%s: incremental backup from a 
different timeline "
+                                         "is not supported: base=%u 
current=%u\n"),
+                                       progname, incremental_timeline, 
latesttli);
+                       disconnect_and_exit(1);
+               }
+ 
+               if (verbose)
+                       fprintf(stderr, _("incremental from point: %X/%X on 
timeline %u\n"),
+                                       (uint32) (incremental_startpoint >> 32),
+                                       (uint32) incremental_startpoint,
+                                       incremental_timeline);
+       }
+ 
        basebkp =
!               psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s",
                                 escaped_label,
                                 showprogress ? "PROGRESS" : "",
                                 includewal && !streamwal ? "WAL" : "",
                                 fastcheckpoint ? "FAST" : "",
                                 includewal ? "NOWAIT" : "",
!                                maxrate_clause ? maxrate_clause : "",
!                                incremental_clause ? incremental_clause : "");
  
        if (PQsendQuery(conn, basebkp) == 0)
        {
*************** BaseBackup(void)
*** 1769,1775 ****
                if (format == 't')
                        ReceiveTarFile(conn, res, i);
                else
!                       ReceiveAndUnpackTarFile(conn, res, i);
        }                                                       /* Loop over 
all tablespaces */
  
        if (showprogress)
--- 1878,1884 ----
                if (format == 't')
                        ReceiveTarFile(conn, res, i);
                else
!                       ReceiveAndUnpackTarFile(conn, res, i, NULL);
        }                                                       /* Loop over 
all tablespaces */
  
        if (showprogress)
*************** BaseBackup(void)
*** 1803,1808 ****
--- 1912,1922 ----
                fprintf(stderr, "transaction log end point: %s\n", xlogend);
        PQclear(res);
  
+       /*
+        * Get the backup profile
+        */
+       ReceiveAndUnpackTarFile(conn, NULL, -1, basedir);
+ 
        res = PQgetResult(conn);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
*************** main(int argc, char **argv)
*** 1942,1947 ****
--- 2056,2062 ----
                {"username", required_argument, NULL, 'U'},
                {"no-password", no_argument, NULL, 'w'},
                {"password", no_argument, NULL, 'W'},
+               {"incremental", required_argument, NULL, 'I'},
                {"status-interval", required_argument, NULL, 's'},
                {"verbose", no_argument, NULL, 'v'},
                {"progress", no_argument, NULL, 'P'},
*************** main(int argc, char **argv)
*** 1949,1955 ****
                {NULL, 0, NULL, 0}
        };
        int                     c;
- 
        int                     option_index;
  
        progname = get_progname(argv[0]);
--- 2064,2069 ----
*************** main(int argc, char **argv)
*** 1970,1976 ****
                }
        }
  
!       while ((c = getopt_long(argc, argv, "D:F:r:RT:xX:l:zZ:d:c:h:p:U:s:wWvP",
                                                        long_options, 
&option_index)) != -1)
        {
                switch (c)
--- 2084,2090 ----
                }
        }
  
!       while ((c = getopt_long(argc, argv, 
"D:F:r:RT:xX:l:zZ:d:c:h:p:U:s:wWI:vP",
                                                        long_options, 
&option_index)) != -1)
        {
                switch (c)
*************** main(int argc, char **argv)
*** 2088,2093 ****
--- 2202,2210 ----
                        case 'W':
                                dbgetpassword = 1;
                                break;
+                       case 'I':
+                               read_backup_profile_header(optarg);
+                               break;
                        case 's':
                                standby_message_timeout = atoi(optarg) * 1000;
                                if (standby_message_timeout < 0)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 138deaf..4bb261a 100644
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
*************** extern void SetWalWriterSleeping(bool sl
*** 249,255 ****
   * Starting/stopping a base backup
   */
  extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
!                                  TimeLineID *starttli_p, char **labelfile);
  extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
                                  TimeLineID *stoptli_p);
  extern void do_pg_abort_backup(void);
--- 249,256 ----
   * Starting/stopping a base backup
   */
  extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
!                                 XLogRecPtr incremental_startpoint,
!                                 TimeLineID *starttli_p, char **labelfile);
  extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
                                  TimeLineID *stoptli_p);
  extern void do_pg_abort_backup(void);
diff --git a/src/include/replication/basebackup.h 
b/src/include/replication/basebackup.h
index 64f2bd5..9182c0a 100644
*** a/src/include/replication/basebackup.h
--- b/src/include/replication/basebackup.h
***************
*** 20,25 ****
--- 20,29 ----
  #define MAX_RATE_LOWER        32
  #define MAX_RATE_UPPER        1048576
  
+ /* Backup profile */
+ #define BACKUP_PROFILE_FILE                   "backup_profile"
+ #define BACKUP_PROFILE_HEADER         "POSTGRESQL BACKUP PROFILE 1"
+ #define BACKUP_PROFILE_SEPARATOR      "FILE LIST"
  
  extern void SendBaseBackup(BaseBackupCmd *cmd);
  
-- 
2.2.1

#!/usr/bin/env python

from __future__ import print_function

import os
import shutil
import sys
from optparse import OptionParser

parser = OptionParser()
parser.add_option("-T", "--tablespace-mapping",
                  dest="tablespace_mapping",
                  default=[],
                  action="append",
                  help="Relocate the tablespace in directory olddir to newdir",
                  metavar="olddir=newdir")
(options, args) = parser.parse_args()

tablespace_mapping = {}
for mapping in options.tablespace_mapping:
    try:
        olddir, newdir = mapping.split('=')
    except:
        print("error: invalid tablespace mapping (%s)" % mapping, file=sys.stderr)
        sys.exit(1)
    tablespace_mapping[olddir]=newdir

if len(args) != 3:
    print("usage: %s base incremental destination" % sys.argv[0], file=sys.stderr)
    sys.exit(1)

base=args[0]
incr=args[1]
dest=args[2]

if os.path.exists(dest):
    print("error: destination must not exist (%s)" % dest, file=sys.stderr)
    sys.exit(1)

profile=open(os.path.join(incr, 'backup_profile'), 'r')

for line in profile:
    if line.strip() == 'FILE LIST':
        break

shutil.copytree(incr, dest, symlinks=True)

# tablespaces preparation
incr_tblspc = os.path.join(incr, 'pg_tblspc')
dest_tblspc = os.path.join(dest, 'pg_tblspc')
for tblspc in os.listdir(incr_tblspc):
    incr_file = os.path.join(incr_tblspc, tblspc)
    dest_file = os.path.join(dest_tblspc, tblspc)
    if not os.path.islink(incr_file):
        print("error: illegal file in source pg_tblspc directory (%s)" % incr_file, file=sys.stderr)
        sys.exit(1)
    old_target = os.readlink(incr_file)
    if old_target not in tablespace_mapping:
        print("error: missing tablespace mapping (%s)" % old_target, file=sys.stderr)
        sys.exit(1)
    new_target = tablespace_mapping[old_target]
    os.unlink(dest_file)
    os.symlink(new_target, dest_file)
    shutil.copytree(old_target, new_target)

for line in profile:
    tblspc, lsn, sent, date, size, path = line.strip().split('\t')
    if lsn == '\\N':
        continue
    if tblspc != '\\N':
        path = os.path.join('pg_tblspc', tblspc, path)
    base_file = os.path.join(base, path)
    dest_file = os.path.join(dest, path)
    shutil.copy2(base_file, dest_file)

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to