Hi Hackers,

following the advices gathered on the list I've prepared a third partial
patch on the way of implementing incremental pg_basebackup as described
here https://wiki.postgresql.org/wiki/Incremental_backup


== Changes

Compared to the previous version I've made the following changes:

* The backup_profile is not optional anymore. Generating it is cheap
enough not to bother the user with such a choice.

* I've isolated the code which detects the maxLSN of a segment in a
separate getMaxLSN function. At the moment it works scanning the whole
file, but I'm looking to replace it in the next versions.

* I've made possible to request an incremental backup passing a "-I
<LSN>" option to pg_basebackup. It is probably too "raw" to remain as
is, but it's is useful at this stage to test the code.

* I've modified the backup label to report the fact that the backup was
taken with the incremental option. The result will be something like:

START WAL LOCATION: 0/52000028 (file 000000010000000000000052)
CHECKPOINT LOCATION: 0/52000060
INCREMENTAL FROM LOCATION: 0/51000028
BACKUP METHOD: streamed
BACKUP FROM: master
START TIME: 2014-10-14 16:05:04 CEST
LABEL: pg_basebackup base backup


== Testing it

At this stage you can make an incremental file-level backup using this
procedure:

pg_basebackup -v -F p -D /tmp/x -x
LSN=$(awk '/^START WAL/{print $4}' /tmp/x/backup_profile)
pg_basebackup -v -F p -D /tmp/y -I $LSN -x

the result will be an incremental backup in /tmp/y based on the full
backup on /tmp/x.

You can "reintegrate" the incremental backup in the /tmp/z directory
with the following little python script, calling it as

./recover.py /tmp/x /tmp/y /tmp/z

----
#!/usr/bin/env python
# recover.py

import os
import shutil
import sys

if len(sys.argv) != 4:
    print >> sys.stderr, "usage: %s base incremental destination"
    sys.exit(1)

base=sys.argv[1]
incr=sys.argv[2]
dest=sys.argv[3]

if os.path.exists(dest):
    print >> sys.stderr, "error: destination must not exist (%s)" % dest
    sys.exit(1)

profile=open(os.path.join(incr, 'backup_profile'), 'r')

for line in profile:
    if line.strip() == 'FILE LIST':
        break

shutil.copytree(incr, dest)
for line in profile:
    tblspc, lsn, sent, date, size, path = line.strip().split('\t')
    if sent == 't' or lsn=='\\N':
        continue
    base_file = os.path.join(base, path)
    dest_file = os.path.join(dest, path)
    shutil.copy2(base_file, dest_file)
----

It has obviously to be replaced by a full-fledged user tool, but it is
enough to test the concept.

== What next

I would to replace the getMaxLSN function with a more-or-less persistent
structure which contains the maxLSN for each data segment.

To make it work I would hook into the ForwardFsyncRequest() function in
src/backend/postmaster/checkpointer.c and update an in memory hash every
time a block is going to be fsynced. The structure could be persisted on
disk at some time (probably on checkpoint).

I think a good key for the hash would be a BufferTag with blocknum
"rounded" to the start of the segment.

I'm here asking for comments and advices on how to implement it in an
acceptable way.

== Disclaimer

The code here is an intermediate step, it does not contain any
documentation beside the code comments and will be subject to deep and
radical changes. However I believe it can be a base to allow PostgreSQL
to have its file-based incremental backup, and a block-based incremental
backup after it.

Regards,
Marco

-- 
Marco Nenciarini - 2ndQuadrant Italy
PostgreSQL Training, Services and Support
marco.nenciar...@2ndquadrant.it | www.2ndQuadrant.it
From 5a7365fc3115c831627c087311c702a79cb355bc Mon Sep 17 00:00:00 2001
From: Marco Nenciarini <marco.nenciar...@2ndquadrant.it>
Date: Tue, 14 Oct 2014 14:31:28 +0100
Subject: [PATCH] File-based incremental backup

Add backup profile to pg_basebackup
INCREMENTAL option naive implementaion
---
 src/backend/access/transam/xlog.c      |   7 +-
 src/backend/access/transam/xlogfuncs.c |   2 +-
 src/backend/replication/basebackup.c   | 316 +++++++++++++++++++++++++++++++--
 src/backend/replication/repl_gram.y    |   6 +
 src/backend/replication/repl_scanner.l |   1 +
 src/bin/pg_basebackup/pg_basebackup.c  |  83 +++++++--
 src/include/access/xlog.h              |   3 +-
 src/include/replication/basebackup.h   |   4 +
 8 files changed, 391 insertions(+), 31 deletions(-)

diff --git a/src/backend/access/transam/xlog.c 
b/src/backend/access/transam/xlog.c
index 235b442..4dc79f0 100644
*** a/src/backend/access/transam/xlog.c
--- b/src/backend/access/transam/xlog.c
*************** XLogFileNameP(TimeLineID tli, XLogSegNo 
*** 9718,9724 ****
   * permissions of the calling user!
   */
  XLogRecPtr
! do_pg_start_backup(const char *backupidstr, bool fast, TimeLineID *starttli_p,
                                   char **labelfile)
  {
        bool            exclusive = (labelfile == NULL);
--- 9718,9725 ----
   * permissions of the calling user!
   */
  XLogRecPtr
! do_pg_start_backup(const char *backupidstr, bool fast,
!                                  XLogRecPtr incremental_startpoint, 
TimeLineID *starttli_p,
                                   char **labelfile)
  {
        bool            exclusive = (labelfile == NULL);
*************** do_pg_start_backup(const char *backupids
*** 9936,9941 ****
--- 9937,9946 ----
                         (uint32) (startpoint >> 32), (uint32) startpoint, 
xlogfilename);
                appendStringInfo(&labelfbuf, "CHECKPOINT LOCATION: %X/%X\n",
                                         (uint32) (checkpointloc >> 32), 
(uint32) checkpointloc);
+               if (incremental_startpoint > 0)
+                       appendStringInfo(&labelfbuf, "INCREMENTAL FROM 
LOCATION: %X/%X\n",
+                                                        (uint32) 
(incremental_startpoint >> 32),
+                                                        (uint32) 
incremental_startpoint);
                appendStringInfo(&labelfbuf, "BACKUP METHOD: %s\n",
                                                 exclusive ? "pg_start_backup" 
: "streamed");
                appendStringInfo(&labelfbuf, "BACKUP FROM: %s\n",
diff --git a/src/backend/access/transam/xlogfuncs.c 
b/src/backend/access/transam/xlogfuncs.c
index 133143d..f1248fa 100644
*** a/src/backend/access/transam/xlogfuncs.c
--- b/src/backend/access/transam/xlogfuncs.c
*************** pg_start_backup(PG_FUNCTION_ARGS)
*** 59,65 ****
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                   errmsg("must be superuser or replication role to run a 
backup")));
  
!       startpoint = do_pg_start_backup(backupidstr, fast, NULL, NULL);
  
        PG_RETURN_LSN(startpoint);
  }
--- 59,65 ----
                                (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                   errmsg("must be superuser or replication role to run a 
backup")));
  
!       startpoint = do_pg_start_backup(backupidstr, fast, 0, NULL, NULL);
  
        PG_RETURN_LSN(startpoint);
  }
diff --git a/src/backend/replication/basebackup.c 
b/src/backend/replication/basebackup.c
index fbcecbb..26f3b8e 100644
*** a/src/backend/replication/basebackup.c
--- b/src/backend/replication/basebackup.c
***************
*** 30,40 ****
--- 30,42 ----
  #include "replication/basebackup.h"
  #include "replication/walsender.h"
  #include "replication/walsender_private.h"
+ #include "storage/bufpage.h"
  #include "storage/fd.h"
  #include "storage/ipc.h"
  #include "utils/builtins.h"
  #include "utils/elog.h"
  #include "utils/ps_status.h"
+ #include "utils/pg_lsn.h"
  #include "utils/timestamp.h"
  
  
*************** typedef struct
*** 46,56 ****
        bool            nowait;
        bool            includewal;
        uint32          maxrate;
  } basebackup_options;
  
  
! static int64 sendDir(char *path, int basepathlen, bool sizeonly, List 
*tablespaces);
! static int64 sendTablespace(char *path, bool sizeonly);
  static bool sendFile(char *readfilename, char *tarfilename,
                 struct stat * statbuf, bool missing_ok);
  static void sendFileWithContent(const char *filename, const char *content);
--- 48,62 ----
        bool            nowait;
        bool            includewal;
        uint32          maxrate;
+       XLogRecPtr      incremental_startpoint;
  } basebackup_options;
  
  
! static int64 sendDir(char *path, int basepathlen, bool sizeonly,
!                                        List *tablespaces, bool has_relfiles,
!                                        XLogRecPtr incremental_startpoint);
! static int64 sendTablespace(char *path, bool sizeonly,
!                               XLogRecPtr incremental_startpoint);
  static bool sendFile(char *readfilename, char *tarfilename,
                 struct stat * statbuf, bool missing_ok);
  static void sendFileWithContent(const char *filename, const char *content);
*************** static void parse_basebackup_options(Lis
*** 64,69 ****
--- 70,81 ----
  static void SendXlogRecPtrResult(XLogRecPtr ptr, TimeLineID tli);
  static int    compareWalFileNames(const void *a, const void *b);
  static void throttle(size_t increment);
+ static bool getFileMaxLSN(char *filename, struct stat * statbuf,
+                               XLogRecPtr *filemaxlsn);
+ static void writeBackupProfileLine(const char *filename, struct stat * 
statbuf,
+                                                                  bool 
is_relfile, XLogRecPtr filemaxlsn, bool sent);
+ static void sendBackupProfile(const char *labelfile);
+ static bool validateRelfilenodeName(char *name);
  
  /* Was the backup currently in-progress initiated in recovery mode? */
  static bool backup_started_in_recovery = false;
*************** static int64 elapsed_min_unit;
*** 93,98 ****
--- 105,116 ----
  /* The last check of the transfer rate. */
  static int64 throttled_last;
  
+ /* Temporary file containing the backup profile */
+ static File backup_profile_fd = 0;
+ 
+ /* Tablespace being currently sent. Used in backup profile generation */
+ static char *current_tablespace = NULL;
+ 
  typedef struct
  {
        char       *oid;
*************** perform_base_backup(basebackup_options *
*** 132,138 ****
  
        backup_started_in_recovery = RecoveryInProgress();
  
!       startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, 
&starttli,
                                                                  &labelfile);
        /*
         * Once do_pg_start_backup has been called, ensure that any failure 
causes
--- 150,160 ----
  
        backup_started_in_recovery = RecoveryInProgress();
  
!       /* Open a temporary file to hold the profile content. */
!       backup_profile_fd = OpenTemporaryFile(false);
! 
!       startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint,
!                                                                 
opt->incremental_startpoint, &starttli,
                                                                  &labelfile);
        /*
         * Once do_pg_start_backup has been called, ensure that any failure 
causes
*************** perform_base_backup(basebackup_options *
*** 208,214 ****
                        ti->oid = pstrdup(de->d_name);
                        ti->path = pstrdup(linkpath);
                        ti->rpath = relpath ? pstrdup(relpath) : NULL;
!                       ti->size = opt->progress ? sendTablespace(fullpath, 
true) : -1;
                        tablespaces = lappend(tablespaces, ti);
  #else
  
--- 230,237 ----
                        ti->oid = pstrdup(de->d_name);
                        ti->path = pstrdup(linkpath);
                        ti->rpath = relpath ? pstrdup(relpath) : NULL;
!                       ti->size = opt->progress ? sendTablespace(fullpath, 
true,
!                                                                               
        opt->incremental_startpoint) : -1;
                        tablespaces = lappend(tablespaces, ti);
  #else
  
*************** perform_base_backup(basebackup_options *
*** 225,231 ****
  
                /* Add a node for the base directory at the end */
                ti = palloc0(sizeof(tablespaceinfo));
!               ti->size = opt->progress ? sendDir(".", 1, true, tablespaces) : 
-1;
                tablespaces = lappend(tablespaces, ti);
  
                /* Send tablespace header */
--- 248,255 ----
  
                /* Add a node for the base directory at the end */
                ti = palloc0(sizeof(tablespaceinfo));
!               ti->size = opt->progress ? sendDir(".", 1, true, tablespaces, 
false,
!                                                                               
   opt->incremental_startpoint) : -1;
                tablespaces = lappend(tablespaces, ti);
  
                /* Send tablespace header */
*************** perform_base_backup(basebackup_options *
*** 267,272 ****
--- 291,302 ----
                        pq_sendint(&buf, 0, 2);         /* natts */
                        pq_endmessage(&buf);
  
+                       /*
+                        * Save the current tablespace, used in 
writeBackupProfileLine
+                        * function
+                        */
+                       current_tablespace = ti->oid;
+ 
                        if (ti->path == NULL)
                        {
                                struct stat statbuf;
*************** perform_base_backup(basebackup_options *
*** 275,281 ****
                                sendFileWithContent(BACKUP_LABEL_FILE, 
labelfile);
  
                                /* ... then the bulk of the files ... */
!                               sendDir(".", 1, false, tablespaces);
  
                                /* ... and pg_control after everything else. */
                                if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
--- 305,311 ----
                                sendFileWithContent(BACKUP_LABEL_FILE, 
labelfile);
  
                                /* ... then the bulk of the files ... */
!                               sendDir(".", 1, false, tablespaces, false, 
opt->incremental_startpoint);
  
                                /* ... and pg_control after everything else. */
                                if (lstat(XLOG_CONTROL_FILE, &statbuf) != 0)
*************** perform_base_backup(basebackup_options *
*** 284,292 ****
                                                         errmsg("could not stat 
control file \"%s\": %m",
                                                                        
XLOG_CONTROL_FILE)));
                                sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, 
&statbuf, false);
                        }
                        else
!                               sendTablespace(ti->path, false);
  
                        /*
                         * If we're including WAL, and this is the main data 
directory we
--- 314,323 ----
                                                         errmsg("could not stat 
control file \"%s\": %m",
                                                                        
XLOG_CONTROL_FILE)));
                                sendFile(XLOG_CONTROL_FILE, XLOG_CONTROL_FILE, 
&statbuf, false);
+                               writeBackupProfileLine(XLOG_CONTROL_FILE, 
&statbuf, false, 0, true);
                        }
                        else
!                               sendTablespace(ti->path, false, 
opt->incremental_startpoint);
  
                        /*
                         * If we're including WAL, and this is the main data 
directory we
*************** perform_base_backup(basebackup_options *
*** 498,503 ****
--- 529,536 ----
  
                        /* XLogSegSize is a multiple of 512, so no need for 
padding */
                        FreeFile(fp);
+ 
+                       writeBackupProfileLine(pathbuf, &statbuf, false, 0, 
true);
                }
  
                /*
*************** perform_base_backup(basebackup_options *
*** 521,532 ****
--- 554,569 ----
                                                 errmsg("could not stat file 
\"%s\": %m", pathbuf)));
  
                        sendFile(pathbuf, pathbuf, &statbuf, false);
+                       writeBackupProfileLine(pathbuf, &statbuf, false, 0, 
true);
                }
  
                /* Send CopyDone message for the last tar file */
                pq_putemptymessage('c');
        }
        SendXlogRecPtrResult(endptr, endtli);
+ 
+       /* Send the profile file. */
+       sendBackupProfile(labelfile);
  }
  
  /*
*************** parse_basebackup_options(List *options, 
*** 555,560 ****
--- 592,598 ----
        bool            o_nowait = false;
        bool            o_wal = false;
        bool            o_maxrate = false;
+       bool            o_incremental = false;
  
        MemSet(opt, 0, sizeof(*opt));
        foreach(lopt, options)
*************** parse_basebackup_options(List *options, 
*** 625,630 ****
--- 663,680 ----
                        opt->maxrate = (uint32) maxrate;
                        o_maxrate = true;
                }
+               else if (strcmp(defel->defname, "incremental") == 0)
+               {
+                       if (o_incremental)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                errmsg("duplicate option 
\"%s\"", defel->defname)));
+ 
+                       opt->incremental_startpoint = DatumGetLSN(
+                               DirectFunctionCall1(pg_lsn_in,
+                                                                       
CStringGetDatum(strVal(defel->arg))));
+                       o_incremental = true;
+               }
                else
                        elog(ERROR, "option \"%s\" not recognized",
                                 defel->defname);
*************** sendFileWithContent(const char *filename
*** 844,849 ****
--- 894,902 ----
                MemSet(buf, 0, pad);
                pq_putmessage('d', buf, pad);
        }
+ 
+       /* Write a backup profile entry for this file. */
+       writeBackupProfileLine(filename, &statbuf, false, 0, true);
  }
  
  /*
*************** sendFileWithContent(const char *filename
*** 854,860 ****
   * Only used to send auxiliary tablespaces, not PGDATA.
   */
  static int64
! sendTablespace(char *path, bool sizeonly)
  {
        int64           size;
        char            pathbuf[MAXPGPATH];
--- 907,913 ----
   * Only used to send auxiliary tablespaces, not PGDATA.
   */
  static int64
! sendTablespace(char *path, bool sizeonly, XLogRecPtr incremental_startpoint)
  {
        int64           size;
        char            pathbuf[MAXPGPATH];
*************** sendTablespace(char *path, bool sizeonly
*** 887,893 ****
        size = 512;                                     /* Size of the header 
just added */
  
        /* Send all the files in the tablespace version directory */
!       size += sendDir(pathbuf, strlen(path), sizeonly, NIL);
  
        return size;
  }
--- 940,946 ----
        size = 512;                                     /* Size of the header 
just added */
  
        /* Send all the files in the tablespace version directory */
!       size += sendDir(pathbuf, strlen(path), sizeonly, NIL, true, 
incremental_startpoint);
  
        return size;
  }
*************** sendTablespace(char *path, bool sizeonly
*** 899,907 ****
   *
   * Omit any directory in the tablespaces list, to avoid backing up
   * tablespaces twice when they were created inside PGDATA.
   */
  static int64
! sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces)
  {
        DIR                *dir;
        struct dirent *de;
--- 952,964 ----
   *
   * Omit any directory in the tablespaces list, to avoid backing up
   * tablespaces twice when they were created inside PGDATA.
+  *
+  * If 'has_relfiles' is set, this directory will be checked to identify
+  * relnode files and compute their maxLSN.
   */
  static int64
! sendDir(char *path, int basepathlen, bool sizeonly, List *tablespaces,
!               bool has_relfiles, XLogRecPtr incremental_startpoint)
  {
        DIR                *dir;
        struct dirent *de;
*************** sendDir(char *path, int basepathlen, boo
*** 1100,1114 ****
                                }
                        }
                        if (!skip_this_dir)
!                               size += sendDir(pathbuf, basepathlen, sizeonly, 
tablespaces);
                }
                else if (S_ISREG(statbuf.st_mode))
                {
                        bool            sent = false;
  
                        if (!sizeonly)
!                               sent = sendFile(pathbuf, pathbuf + basepathlen 
+ 1, &statbuf,
!                                                               true);
  
                        if (sent || sizeonly)
                        {
--- 1157,1209 ----
                                }
                        }
                        if (!skip_this_dir)
!                       {
!                               bool    subdir_has_relfiles;
! 
!                               /*
!                                * Whithin PGDATA relnode files are contained 
only in "global"
!                                * and "base" directory
!                                */
!                               subdir_has_relfiles = has_relfiles
!                                       || strcmp(pathbuf, "./global") == 0
!                                       || strcmp(pathbuf, "./base") == 0;
! 
!                               size += sendDir(pathbuf, basepathlen, sizeonly, 
tablespaces,
!                                                               
subdir_has_relfiles, incremental_startpoint);
!                       }
                }
                else if (S_ISREG(statbuf.st_mode))
                {
                        bool            sent = false;
  
                        if (!sizeonly)
!                       {
!                               bool            is_relfile;
!                               XLogRecPtr      filemaxlsn = 0;
! 
!                               /*
!                                * If the current directory can have relnode 
files, check the file
!                                * name to see if it is one of them.
!                                */
!                               is_relfile = has_relfiles && 
validateRelfilenodeName(de->d_name);
! 
!                               /*
!                                * If is_relfile get the MaxLSN. If unable to 
get the MaxLSN
!                                * set is_relfile to false.
!                                */
!                               is_relfile = is_relfile && 
getFileMaxLSN(pathbuf, &statbuf,
!                                                                               
                                 &filemaxlsn);
! 
!                               if (!is_relfile
!                                       || incremental_startpoint == 0
!                                       || filemaxlsn > incremental_startpoint)
!                                       sent = sendFile(pathbuf, pathbuf + 
basepathlen + 1,
!                                                                       
&statbuf, true);
! 
!                               /* Write a backup profile entry for this file. 
*/
!                               writeBackupProfileLine(pathbuf + basepathlen + 
1, &statbuf,
!                                                                          
is_relfile, filemaxlsn, sent);
!                       }
  
                        if (sent || sizeonly)
                        {
*************** throttle(size_t increment)
*** 1303,1305 ****
--- 1398,1595 ----
                /* Sleep was necessary but might have been interrupted. */
                throttled_last = GetCurrentIntegerTimestamp();
  }
+ 
+ /*
+  * Read the maximum LSN number in the one of data file (relnode file).
+  */
+ static bool
+ getFileMaxLSN(char *filename, struct stat * statbuf, XLogRecPtr *filemaxlsn)
+ {
+       FILE       *fp;
+       char            buf[BLCKSZ];
+       size_t          cnt;
+       pgoff_t         len = 0;
+       XLogRecPtr      pagelsn;
+ 
+       *filemaxlsn = 0;
+ 
+       fp = AllocateFile(filename, "rb");
+       if (fp == NULL)
+       {
+               if (errno == ENOENT)
+                       return false;
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not open file \"%s\": %m", 
filename)));
+       }
+ 
+       while ((cnt = fread(buf, 1, Min(sizeof(buf), statbuf->st_size - len), 
fp)) > 0)
+       {
+               pagelsn = PageGetLSN(buf);
+               if (*filemaxlsn < pagelsn)
+                       *filemaxlsn = pagelsn;
+ 
+               if (len >= statbuf->st_size)
+               {
+                       /*
+                        * Reached end of file. The file could be longer, if it 
was
+                        * extended while we were sending it, but for a base 
backup we can
+                        * ignore such extended data. It will be restored from 
WAL.
+                        */
+                       break;
+               }
+       }
+ 
+       FreeFile(fp);
+       return true;
+ }
+ 
+ /*
+  * Write an entry in file list section of backup profile.
+  */
+ static void
+ writeBackupProfileLine(const char *filename, struct stat * statbuf,
+                                          bool is_relfile, XLogRecPtr 
filemaxlsn, bool sent)
+ {
+       /*
+        * tablespace oid (10) + max LSN (17) + mtime (10) + size (19) +
+        * path (MAXPGPATH) + separators (4) + trailing \0 = 65
+        */
+       char    buf[MAXPGPATH + 65];
+       char    maxlsn[17];
+       int             rowlen;
+ 
+       Assert(backup_profile_fd > 0);
+ 
+       /* Prepare maxlsn */
+       if (is_relfile)
+       {
+               snprintf(maxlsn, sizeof(maxlsn), "%X/%X",
+                                (uint32) (filemaxlsn >> 32), (uint32) 
filemaxlsn);
+       }
+       else
+       {
+               strlcpy(maxlsn, "\\N", sizeof(maxlsn));
+       }
+ 
+       rowlen = snprintf(buf, sizeof(buf), "%s\t%s\t%s\t%u\t%lld\t%s\n",
+                                         current_tablespace ? 
current_tablespace : "\\N",
+                                         maxlsn,
+                                         sent ? "t" : "f",
+                                         (uint32) statbuf->st_mtime,
+                                         statbuf->st_size,
+                                         filename);
+       FileWrite(backup_profile_fd, buf, rowlen);
+ }
+ 
+ /*
+  * Send the backup profile. It is wrapped in a tar CopyOutResponse containing
+  * a tar stream with only one file.
+  */
+ static void
+ sendBackupProfile(const char *labelfile)
+ {
+       StringInfoData msgbuf;
+       struct stat statbuf;
+       char            buf[TAR_SEND_SIZE];
+       size_t          cnt;
+       pgoff_t         len = 0;
+       size_t          pad;
+       char *backup_profile = FilePathName(backup_profile_fd);
+ 
+       /* Send CopyOutResponse message */
+       pq_beginmessage(&msgbuf, 'H');
+       pq_sendbyte(&msgbuf, 0);                /* overall format */
+       pq_sendint(&msgbuf, 0, 2);              /* natts */
+       pq_endmessage(&msgbuf);
+ 
+       if (lstat(backup_profile, &statbuf) != 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not stat backup_profile file 
\"%s\": %m",
+                                               backup_profile)));
+ 
+       /* Set the file position to the beginning. */
+       FileSeek(backup_profile_fd, 0, SEEK_SET);
+ 
+       /*
+        * Fill the buffer with content of backup profile header section. Being 
it
+        * the concatenation of two separator and the backup label, it should be
+        * shorter of TAR_SEND_SIZE.
+        */
+       cnt = snprintf(buf, sizeof(buf), "%s\n%s%s\n",
+                                  BACKUP_PROFILE_HEADER,
+                                  labelfile,
+                                  BACKUP_PROFILE_SEPARATOR);
+ 
+       /* Add size of backup label and separators */
+       statbuf.st_size += cnt;
+ 
+       _tarWriteHeader(BACKUP_PROFILE_FILE, NULL, &statbuf);
+ 
+       /* Send backup profile header */
+       if (pq_putmessage('d', buf, cnt))
+               ereport(ERROR,
+                               (errmsg("base backup could not send data, 
aborting backup")));
+ 
+       len += cnt;
+       throttle(cnt);
+ 
+       while ((cnt = FileRead(backup_profile_fd, buf, sizeof(buf))) > 0)
+       {
+               /* Send the chunk as a CopyData message */
+               if (pq_putmessage('d', buf, cnt))
+                       ereport(ERROR,
+                                       (errmsg("base backup could not send 
data, aborting backup")));
+ 
+               len += cnt;
+               throttle(cnt);
+ 
+       }
+ 
+       /*
+        * Pad to 512 byte boundary, per tar format requirements. (This small
+        * piece of data is probably not worth throttling.)
+        */
+       pad = ((len + 511) & ~511) - len;
+       if (pad > 0)
+       {
+               MemSet(buf, 0, pad);
+               pq_putmessage('d', buf, pad);
+       }
+ 
+       pq_putemptymessage('c');        /* CopyDone */
+ }
+ 
+ /*
+  * relfilenode name validation.
+  *
+  * Format with_ext == true    [0-9]+[ \w | _vm | _fsm | _init ][\.][0-9]*
+  *              with_ext == false [0-9]+
+  */
+ static bool
+ validateRelfilenodeName(char *name)
+ {
+       int                     pos = 0;
+ 
+       while ((name[pos] >= '0') && (name[pos] <= '9'))
+               pos++;
+ 
+       if (name[pos] == '_')
+       {
+               pos++;
+               while ((name[pos] >= 'a') && (name[pos] <= 'z'))
+                       pos++;
+       }
+       if (name[pos] == '.')
+       {
+               pos++;
+               while ((name[pos] >= '0') && (name[pos] <= '9'))
+                       pos++;
+       }
+ 
+       if (name[pos] == 0)
+               return true;
+ 
+       return false;
+ }
diff --git a/src/backend/replication/repl_gram.y 
b/src/backend/replication/repl_gram.y
index 154aaac..97f1091 100644
*** a/src/backend/replication/repl_gram.y
--- b/src/backend/replication/repl_gram.y
*************** Node *replication_parse_result;
*** 75,80 ****
--- 75,81 ----
  %token K_PHYSICAL
  %token K_LOGICAL
  %token K_SLOT
+ %token K_INCREMENTAL
  
  %type <node>  command
  %type <node>  base_backup start_replication start_logical_replication 
create_replication_slot drop_replication_slot identify_system timeline_history
*************** base_backup_opt:
*** 168,173 ****
--- 169,179 ----
                                  $$ = makeDefElem("max_rate",
                                                                   (Node 
*)makeInteger($2));
                                }
+                       | K_INCREMENTAL SCONST
+                               {
+                                 $$ = makeDefElem("incremental",
+                                                                  (Node 
*)makeString($2));
+                               }
                        ;
  
  create_replication_slot:
diff --git a/src/backend/replication/repl_scanner.l 
b/src/backend/replication/repl_scanner.l
index a257124..74c5119 100644
*** a/src/backend/replication/repl_scanner.l
--- b/src/backend/replication/repl_scanner.l
*************** TIMELINE_HISTORY        { return K_TIMELINE_HIS
*** 96,101 ****
--- 96,102 ----
  PHYSICAL                      { return K_PHYSICAL; }
  LOGICAL                               { return K_LOGICAL; }
  SLOT                          { return K_SLOT; }
+ INCREMENTAL                   { return K_INCREMENTAL; }
  
  ","                           { return ','; }
  ";"                           { return ';'; }
diff --git a/src/bin/pg_basebackup/pg_basebackup.c 
b/src/bin/pg_basebackup/pg_basebackup.c
index 0ebda9a..9902a8a 100644
*** a/src/bin/pg_basebackup/pg_basebackup.c
--- b/src/bin/pg_basebackup/pg_basebackup.c
*************** static bool writerecoveryconf = false;
*** 66,71 ****
--- 66,72 ----
  static int    standby_message_timeout = 10 * 1000;            /* 10 sec = 
default */
  static pg_time_t last_progress_report = 0;
  static int32 maxrate = 0;             /* no limit by default */
+ static XLogRecPtr incremental_startpoint = 0;
  
  
  /* Progress counters */
*************** static void verify_dir_is_empty_or_creat
*** 100,106 ****
  static void progress_report(int tablespacenum, const char *filename, bool 
force);
  
  static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
! static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum);
  static void GenerateRecoveryConf(PGconn *conn);
  static void WriteRecoveryConf(void);
  static void BaseBackup(void);
--- 101,108 ----
  static void progress_report(int tablespacenum, const char *filename, bool 
force);
  
  static void ReceiveTarFile(PGconn *conn, PGresult *res, int rownum);
! static void ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum,
!                                                                       const 
char *dest_path);
  static void GenerateRecoveryConf(PGconn *conn);
  static void WriteRecoveryConf(void);
  static void BaseBackup(void);
*************** usage(void)
*** 231,236 ****
--- 233,240 ----
        printf(_("\nOptions controlling the output:\n"));
        printf(_("  -D, --pgdata=DIRECTORY receive base backup into 
directory\n"));
        printf(_("  -F, --format=p|t       output format (plain (default), 
tar)\n"));
+       printf(_("  -I, --incremental=STARTPOINT\n"
+                        "                         send only chenges after 
STARTPOINT\n"));
        printf(_("  -r, --max-rate=RATE    maximum transfer rate to transfer 
data directory\n"
                         "                         (in kB/s, or use suffix 
\"k\" or \"M\")\n"));
        printf(_("  -R, --write-recovery-conf\n"
*************** get_tablespace_mapping(const char *dir)
*** 1116,1124 ****
   * If the data is for the main data directory, it will be restored in the
   * specified directory. If it's for another tablespace, it will be restored
   * in the original or mapped directory.
   */
  static void
! ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum)
  {
        char            current_path[MAXPGPATH];
        char            filename[MAXPGPATH];
--- 1120,1135 ----
   * If the data is for the main data directory, it will be restored in the
   * specified directory. If it's for another tablespace, it will be restored
   * in the original or mapped directory.
+  *
+  * If 'res' is NULL, the destination directory is taken from the
+  * 'dest_path' parameter.
+  *
+  * When 'dest_path' is specified, progresses are not displayed because the
+  * content it is not in any tablespace.
   */
  static void
! ReceiveAndUnpackTarFile(PGconn *conn, PGresult *res, int rownum,
!                                               const char *dest_path)
  {
        char            current_path[MAXPGPATH];
        char            filename[MAXPGPATH];
*************** ReceiveAndUnpackTarFile(PGconn *conn, PG
*** 1129,1141 ****
        char       *copybuf = NULL;
        FILE       *file = NULL;
  
!       basetablespace = PQgetisnull(res, rownum, 0);
!       if (basetablespace)
!               strlcpy(current_path, basedir, sizeof(current_path));
        else
!               strlcpy(current_path,
!                               get_tablespace_mapping(PQgetvalue(res, rownum, 
1)),
!                               sizeof(current_path));
  
        /*
         * Get the COPY data
--- 1140,1167 ----
        char       *copybuf = NULL;
        FILE       *file = NULL;
  
!       /* 'res' and 'dest_path' are mutually exclusive */
!       assert(!res != !dest_path);
! 
!       /*
!        * If 'res' is NULL, the destination directory is taken from the
!        * 'dest_path' parameter.
!        */
!       if (res)
!       {
!               basetablespace = PQgetisnull(res, rownum, 0);
!               if (basetablespace)
!                       strlcpy(current_path, basedir, sizeof(current_path));
!               else
!                       strlcpy(current_path,
!                                       get_tablespace_mapping(PQgetvalue(res, 
rownum, 1)),
!                                       sizeof(current_path));
!       }
        else
!       {
!               basetablespace = false;
!               strlcpy(current_path, dest_path, sizeof(current_path));
!       }
  
        /*
         * Get the COPY data
*************** ReceiveAndUnpackTarFile(PGconn *conn, PG
*** 1342,1348 ****
                                disconnect_and_exit(1);
                        }
                        totaldone += r;
!                       progress_report(rownum, filename, false);
  
                        current_len_left -= r;
                        if (current_len_left == 0 && current_padding == 0)
--- 1368,1376 ----
                                disconnect_and_exit(1);
                        }
                        totaldone += r;
!                       /* report progress unless a custom destination is used 
*/
!                       if (!dest_path)
!                               progress_report(rownum, filename, false);
  
                        current_len_left -= r;
                        if (current_len_left == 0 && current_padding == 0)
*************** ReceiveAndUnpackTarFile(PGconn *conn, PG
*** 1358,1364 ****
                        }
                }                                               /* continuing 
data in existing file */
        }                                                       /* loop over 
all data blocks */
!       progress_report(rownum, filename, true);
  
        if (file != NULL)
        {
--- 1386,1394 ----
                        }
                }                                               /* continuing 
data in existing file */
        }                                                       /* loop over 
all data blocks */
!       /* report progress unless a custom destination is used */
!       if (!dest_path)
!               progress_report(rownum, filename, true);
  
        if (file != NULL)
        {
*************** BaseBackup(void)
*** 1574,1579 ****
--- 1604,1610 ----
        char       *basebkp;
        char            escaped_label[MAXPGPATH];
        char       *maxrate_clause = NULL;
+       char       *incremental_clause = NULL;
        int                     i;
        char            xlogstart[64];
        char            xlogend[64];
*************** BaseBackup(void)
*** 1635,1648 ****
        if (maxrate > 0)
                maxrate_clause = psprintf("MAX_RATE %u", maxrate);
  
        basebkp =
!               psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s",
                                 escaped_label,
                                 showprogress ? "PROGRESS" : "",
                                 includewal && !streamwal ? "WAL" : "",
                                 fastcheckpoint ? "FAST" : "",
                                 includewal ? "NOWAIT" : "",
!                                maxrate_clause ? maxrate_clause : "");
  
        if (PQsendQuery(conn, basebkp) == 0)
        {
--- 1666,1685 ----
        if (maxrate > 0)
                maxrate_clause = psprintf("MAX_RATE %u", maxrate);
  
+       if (incremental_startpoint > 0)
+               incremental_clause = psprintf("INCREMENTAL '%X/%X'",
+                                                                         
(uint32) (incremental_startpoint >> 32),
+                                                                         
(uint32) incremental_startpoint);
+ 
        basebkp =
!               psprintf("BASE_BACKUP LABEL '%s' %s %s %s %s %s %s",
                                 escaped_label,
                                 showprogress ? "PROGRESS" : "",
                                 includewal && !streamwal ? "WAL" : "",
                                 fastcheckpoint ? "FAST" : "",
                                 includewal ? "NOWAIT" : "",
!                                maxrate_clause ? maxrate_clause : "",
!                                incremental_clause ? incremental_clause : "");
  
        if (PQsendQuery(conn, basebkp) == 0)
        {
*************** BaseBackup(void)
*** 1756,1762 ****
                if (format == 't')
                        ReceiveTarFile(conn, res, i);
                else
!                       ReceiveAndUnpackTarFile(conn, res, i);
        }                                                       /* Loop over 
all tablespaces */
  
        if (showprogress)
--- 1793,1799 ----
                if (format == 't')
                        ReceiveTarFile(conn, res, i);
                else
!                       ReceiveAndUnpackTarFile(conn, res, i, NULL);
        }                                                       /* Loop over 
all tablespaces */
  
        if (showprogress)
*************** BaseBackup(void)
*** 1790,1795 ****
--- 1827,1837 ----
                fprintf(stderr, "transaction log end point: %s\n", xlogend);
        PQclear(res);
  
+       /*
+        * Get the backup profile
+        */
+       ReceiveAndUnpackTarFile(conn, NULL, -1, basedir);
+ 
        res = PQgetResult(conn);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
        {
*************** main(int argc, char **argv)
*** 1929,1934 ****
--- 1971,1977 ----
                {"username", required_argument, NULL, 'U'},
                {"no-password", no_argument, NULL, 'w'},
                {"password", no_argument, NULL, 'W'},
+               {"incremental", required_argument, NULL, 'I'},
                {"status-interval", required_argument, NULL, 's'},
                {"verbose", no_argument, NULL, 'v'},
                {"progress", no_argument, NULL, 'P'},
*************** main(int argc, char **argv)
*** 1936,1942 ****
                {NULL, 0, NULL, 0}
        };
        int                     c;
! 
        int                     option_index;
  
        progname = get_progname(argv[0]);
--- 1979,1985 ----
                {NULL, 0, NULL, 0}
        };
        int                     c;
!       int                     hi, lo;
        int                     option_index;
  
        progname = get_progname(argv[0]);
*************** main(int argc, char **argv)
*** 1957,1963 ****
                }
        }
  
!       while ((c = getopt_long(argc, argv, "D:F:r:RT:xX:l:zZ:d:c:h:p:U:s:wWvP",
                                                        long_options, 
&option_index)) != -1)
        {
                switch (c)
--- 2000,2006 ----
                }
        }
  
!       while ((c = getopt_long(argc, argv, 
"D:F:r:RT:xX:l:zZ:d:c:h:p:U:s:wWI:vP",
                                                        long_options, 
&option_index)) != -1)
        {
                switch (c)
*************** main(int argc, char **argv)
*** 2075,2080 ****
--- 2118,2133 ----
                        case 'W':
                                dbgetpassword = 1;
                                break;
+                       case 'I':
+                               if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
+                               {
+                                       fprintf(stderr,
+                                                       _("%s: could not parse 
incremental start position \"%s\"\n"),
+                                                       progname, optarg);
+                                       exit(1);
+                               }
+                               incremental_startpoint = ((uint64) hi << 32) | 
lo;
+                               break;
                        case 's':
                                standby_message_timeout = atoi(optarg) * 1000;
                                if (standby_message_timeout < 0)
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 0f068d9..91d05d5 100644
*** a/src/include/access/xlog.h
--- b/src/include/access/xlog.h
*************** extern void SetWalWriterSleeping(bool sl
*** 349,355 ****
   * Starting/stopping a base backup
   */
  extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
!                                  TimeLineID *starttli_p, char **labelfile);
  extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
                                  TimeLineID *stoptli_p);
  extern void do_pg_abort_backup(void);
--- 349,356 ----
   * Starting/stopping a base backup
   */
  extern XLogRecPtr do_pg_start_backup(const char *backupidstr, bool fast,
!                                 XLogRecPtr incremental_startpoint,
!                                 TimeLineID *starttli_p, char **labelfile);
  extern XLogRecPtr do_pg_stop_backup(char *labelfile, bool waitforarchive,
                                  TimeLineID *stoptli_p);
  extern void do_pg_abort_backup(void);
diff --git a/src/include/replication/basebackup.h 
b/src/include/replication/basebackup.h
index 988bce7..9210e67 100644
*** a/src/include/replication/basebackup.h
--- b/src/include/replication/basebackup.h
***************
*** 20,25 ****
--- 20,29 ----
  #define MAX_RATE_LOWER        32
  #define MAX_RATE_UPPER        1048576
  
+ /* Backup profile */
+ #define BACKUP_PROFILE_FILE                   "backup_profile"
+ #define BACKUP_PROFILE_HEADER         "POSTGRESQL BACKUP PROFILE 1"
+ #define BACKUP_PROFILE_SEPARATOR      "FILE LIST"
  
  extern void SendBaseBackup(BaseBackupCmd *cmd);
  
-- 
2.1.2

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to