From 78b738c97733c7b0ac342fcbd223cd3ac7a132fa Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Thu, 22 Feb 2024 15:02:28 +0530
Subject: [PATCH v1] Fix incremental backup interaction with
 XLOG_DBASE_CREATE_FILE_COPY.

After XLOG_DBASE_CREATE_FILE_COPY, a correct incremental backup needs
to copy in full everything with the database and tablespace OID
mentioned in that record; but that record doesn't specifically mention
the blocks, or even the relfilenumbers, of the affected relations.
As a result, we were failing to copy data that we should have copied.

To fix, enter the DB OID and tablespace OID into the block reference
table with relfilenumber 0 and limit block 0; and treat that as a
limit block of 0 for every relfilenumber whose DB OID and tablespace
OID match.

Also, add a test case.
---
 src/backend/backup/basebackup_incremental.c   | 18 ++++-
 src/backend/postmaster/walsummarizer.c        | 75 +++++++++++++++++++
 src/bin/pg_combinebackup/meson.build          |  1 +
 .../pg_combinebackup/t/006_db_file_copy.pl    | 58 ++++++++++++++
 4 files changed, 151 insertions(+), 1 deletion(-)
 create mode 100644 src/bin/pg_combinebackup/t/006_db_file_copy.pl

diff --git a/src/backend/backup/basebackup_incremental.c b/src/backend/backup/basebackup_incremental.c
index e994ee66bb..3bd4979f29 100644
--- a/src/backend/backup/basebackup_incremental.c
+++ b/src/backend/backup/basebackup_incremental.c
@@ -777,9 +777,25 @@ GetFileBackupMethod(IncrementalBackupInfo *ib, const char *path,
 			return BACK_UP_FILE_FULLY;
 	}
 
-	/* Look up the block reference table entry. */
+	/*
+	 * Look up the special block reference table entry for the database as
+	 * a whole.
+	 */
 	rlocator.spcOid = spcoid;
 	rlocator.dbOid = dboid;
+	rlocator.relNumber = 0;
+	if (BlockRefTableGetEntry(ib->brtab, &rlocator, MAIN_FORKNUM,
+							  &limit_block) != NULL)
+	{
+		/*
+		 * According to the WAL summary, this database OID/tablespace OID
+		 * pairing has been created since the previous backup. So, everything
+		 * in it must be backed up fully.
+		 */
+		return BACK_UP_FILE_FULLY;
+	}
+
+	/* Look up the block reference table entry for this relfilenode. */
 	rlocator.relNumber = relfilenumber;
 	brtentry = BlockRefTableGetEntry(ib->brtab, &rlocator, forknum,
 									 &limit_block);
diff --git a/src/backend/postmaster/walsummarizer.c b/src/backend/postmaster/walsummarizer.c
index f295eff32f..8d05bac80e 100644
--- a/src/backend/postmaster/walsummarizer.c
+++ b/src/backend/postmaster/walsummarizer.c
@@ -29,6 +29,7 @@
 #include "access/xlogutils.h"
 #include "backup/walsummary.h"
 #include "catalog/storage_xlog.h"
+#include "commands/dbcommands_xlog.h"
 #include "common/blkreftable.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
@@ -148,6 +149,8 @@ static void HandleWalSummarizerInterrupts(void);
 static XLogRecPtr SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn,
 							   bool exact, XLogRecPtr switch_lsn,
 							   XLogRecPtr maximum_lsn);
+static void SummarizeDbaseRecord(XLogReaderState *xlogreader,
+								 BlockRefTable *brtab);
 static void SummarizeSmgrRecord(XLogReaderState *xlogreader,
 								BlockRefTable *brtab);
 static void SummarizeXactRecord(XLogReaderState *xlogreader,
@@ -963,6 +966,9 @@ SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact,
 		/* Special handling for particular types of WAL records. */
 		switch (XLogRecGetRmid(xlogreader))
 		{
+			case RM_DBASE_ID:
+				SummarizeDbaseRecord(xlogreader, brtab);
+				break;
 			case RM_SMGR_ID:
 				SummarizeSmgrRecord(xlogreader, brtab);
 				break;
@@ -1076,6 +1082,75 @@ SummarizeWAL(TimeLineID tli, XLogRecPtr start_lsn, bool exact,
 	return summary_end_lsn;
 }
 
+/*
+ * Special handling for WAL records with RM_DBASE_ID.
+ */
+static void
+SummarizeDbaseRecord(XLogReaderState *xlogreader, BlockRefTable *brtab)
+{
+	uint8		info = XLogRecGetInfo(xlogreader) & ~XLR_INFO_MASK;
+
+	/*
+	 * We use relfilenode zero for a given database OID and tablespace OID
+	 * to indicate that all relations with that pair of IDs have been
+	 * recreated if they exist at all. Effectively, we're setting a limit
+	 * block of 0 for all such relfilenodes.
+	 *
+	 * Technically, this special handling is only needed in the case of
+	 * XLOG_DBASE_CREATE_FILE_COPY, because that can create a whole bunch
+	 * of relation files in a directory without logging anything
+	 * specific to each one. If we didn't mark the whole DB OID/TS OID
+	 * combination in some way, then a tablespace that was dropped after
+	 * the reference backup and recreated using the FILE_COPY method prior
+	 * to the incremental backup would look just like one that was never
+	 * touched at all, which would be catastrophic.
+	 *
+	 * But it seems best to adopt this treatment for all records that drop
+	 * or create a DB OID/TS OID combination. That's similar to how we
+	 * treat the limit block for individual relations, and it's an extra
+	 * layer of safety here. We can never lose data by marking more stuff
+	 * as needing to be backed up in full.
+	 */
+	if (info == XLOG_DBASE_CREATE_FILE_COPY)
+	{
+		xl_dbase_create_file_copy_rec *xlrec;
+		RelFileLocator rlocator;
+
+		xlrec =
+			(xl_dbase_create_file_copy_rec *) XLogRecGetData(xlogreader);
+		rlocator.spcOid = xlrec->tablespace_id;
+		rlocator.dbOid = xlrec->db_id;
+		rlocator.relNumber = 0;
+		BlockRefTableSetLimitBlock(brtab, &rlocator, MAIN_FORKNUM, 0);
+	}
+	else if (info == XLOG_DBASE_CREATE_WAL_LOG)
+	{
+		xl_dbase_create_wal_log_rec *xlrec;
+		RelFileLocator rlocator;
+
+		xlrec = (xl_dbase_create_wal_log_rec *) XLogRecGetData(xlogreader);
+		rlocator.spcOid = xlrec->tablespace_id;
+		rlocator.dbOid = xlrec->db_id;
+		rlocator.relNumber = 0;
+		BlockRefTableSetLimitBlock(brtab, &rlocator, MAIN_FORKNUM, 0);
+	}
+	else if (info == XLOG_DBASE_DROP)
+	{
+		xl_dbase_drop_rec *xlrec;
+		RelFileLocator rlocator;
+		int		i;
+
+		xlrec = (xl_dbase_drop_rec *) XLogRecGetData(xlogreader);
+		rlocator.dbOid = xlrec->db_id;
+		rlocator.relNumber = 0;
+		for (i = 0; i < xlrec->ntablespaces; ++i)
+		{
+			rlocator.spcOid = xlrec->tablespace_ids[i];
+			BlockRefTableSetLimitBlock(brtab, &rlocator, MAIN_FORKNUM, 0);
+		}
+	}
+}
+
 /*
  * Special handling for WAL records with RM_SMGR_ID.
  */
diff --git a/src/bin/pg_combinebackup/meson.build b/src/bin/pg_combinebackup/meson.build
index 30dbbaa6cf..1d4b9c218f 100644
--- a/src/bin/pg_combinebackup/meson.build
+++ b/src/bin/pg_combinebackup/meson.build
@@ -33,6 +33,7 @@ tests += {
       't/003_timeline.pl',
       't/004_manifest.pl',
       't/005_integrity.pl',
+      't/006_db_file_copy.pl',
     ],
   }
 }
diff --git a/src/bin/pg_combinebackup/t/006_db_file_copy.pl b/src/bin/pg_combinebackup/t/006_db_file_copy.pl
new file mode 100644
index 0000000000..24f55449c6
--- /dev/null
+++ b/src/bin/pg_combinebackup/t/006_db_file_copy.pl
@@ -0,0 +1,58 @@
+# Copyright (c) 2021-2024, PostgreSQL Global Development Group
+
+use strict;
+use warnings FATAL => 'all';
+use File::Compare;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Set up a new database instance.
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(has_archiving => 1, allows_streaming => 1);
+$primary->append_conf('postgresql.conf', 'summarize_wal = on');
+$primary->start;
+
+# Initial setup.
+$primary->safe_psql('postgres', <<EOM);
+CREATE DATABASE lakh OID = 100000 STRATEGY = FILE_COPY
+EOM
+$primary->safe_psql('lakh', <<EOM);
+CREATE TABLE t1 (a int)
+EOM
+
+# Take a full backup.
+my $backup1path = $primary->backup_dir . '/backup1';
+$primary->command_ok(
+	[ 'pg_basebackup', '-D', $backup1path, '--no-sync', '-cfast' ],
+	"full backup");
+
+# Now make some database changes.
+$primary->safe_psql('postgres', <<EOM);
+DROP DATABASE lakh;
+CREATE DATABASE lakh OID = 100000 STRATEGY = FILE_COPY
+EOM
+
+# Take an incremental backup.
+my $backup2path = $primary->backup_dir . '/backup2';
+$primary->command_ok(
+	[ 'pg_basebackup', '-D', $backup2path, '--no-sync', '-cfast',
+	  '--incremental', $backup1path . '/backup_manifest' ],
+	"incremental backup");
+
+# Recover the incremental backup.
+my $restore = PostgreSQL::Test::Cluster->new('restore');
+$restore->init_from_backup($primary, 'backup2',
+						   combine_with_prior => [ 'backup1' ]);
+$restore->start();
+
+# Query the DB.
+my $stdout;
+my $stderr;
+$restore->psql('lakh', 'SELECT * FROM t1',
+			   stdout => \$stdout, stderr => \$stderr);
+is($stdout, '', 'SELECT * FROM t1: no stdout');
+like($stderr, qr/relation "t1" does not exist/,
+	 'SELECT * FROM t1: stderr missing table');
+
+done_testing();
-- 
2.39.3 (Apple Git-145)

