From a6a0dbecd155cc1ded5ce59e84cf8827676e9b42 Mon Sep 17 00:00:00 2001
From: Robert Haas <rhaas@postgresql.org>
Date: Fri, 5 Nov 2021 10:05:02 -0400
Subject: [PATCH v10 4/4] Server-side gzip compression.

pg_basebackup now has a --server-compression option, which can be
set to 'none' (the default), 'gzip', or 'gzipN' where N is a digit
between 1 and 9. If set to 'gzip' or 'gzipN' it will compress the
generated tar files on the server side using 'gzip', either at the
default compression level or a the compression level specified by N.

At present, pg_basebackup cannot decompress .gz files, so the
--server-compression option will cause a failure if (1) -Ft is not
used or (2) -R is used or (3) -D- is used without --no-manifest.

Patch by me, with a bug fix by Jeevan Ladhe.
---
 doc/src/sgml/ref/pg_basebackup.sgml       |  29 ++-
 src/backend/Makefile                      |   2 +-
 src/backend/replication/Makefile          |   1 +
 src/backend/replication/basebackup.c      |  39 +++
 src/backend/replication/basebackup_gzip.c | 304 ++++++++++++++++++++++
 src/bin/pg_basebackup/pg_basebackup.c     |  43 ++-
 src/include/replication/basebackup_sink.h |   1 +
 7 files changed, 415 insertions(+), 4 deletions(-)
 create mode 100644 src/backend/replication/basebackup_gzip.c

diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index 165a9ea5cc..9ce8b8d89d 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -224,6 +224,31 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--server-compression=<replaceable class="parameter">target</replaceable></option></term>
+      <listitem>
+
+       <para>
+        Allows the tar files generated for each tablespace to be compressed
+        on the server, before they are sent to the client. The default value
+        is <literal>none</literal>, which performs no compression. If set
+        to <literal>gzip</literal>, compression is performed using gzip and
+        the suffix <filename>.gz</filename> will automatically be added to
+        compressed files. A numeric digit between 1 and 9 can be added to
+        specify the compression level; for instance, <literal>gzip9</literal>
+        will provide the maximum compression that the <literal>gzip</literal>
+        algorithm can provide.
+       </para>
+       <para>
+        Since the write-ahead logs are fetched via a separate client
+        connection, they cannot be compressed using this option. See also
+        the <literal>--gzip</literal> and <literal>--compress</literal>
+        options.
+       </para>
+
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-t <replaceable class="parameter">target</replaceable></option></term>
       <term><option>--target=<replaceable class="parameter">target</replaceable></option></term>
@@ -405,7 +430,9 @@ PostgreSQL documentation
         compression level (0 through 9, 0 being no compression and 9 being best
         compression). Compression is only available when using the tar
         format, and the suffix <filename>.gz</filename> will
-        automatically be added to all tar filenames.
+        automatically be added to all tar filenames. When this option is
+        used, compression is performed on the client side;
+        see also <literal>--server-compression</literal>.
        </para>
       </listitem>
      </varlistentry>
diff --git a/src/backend/Makefile b/src/backend/Makefile
index 0da848b1fd..3af216ddfc 100644
--- a/src/backend/Makefile
+++ b/src/backend/Makefile
@@ -48,7 +48,7 @@ OBJS = \
 LIBS := $(filter-out -lpgport -lpgcommon, $(LIBS)) $(LDAP_LIBS_BE) $(ICU_LIBS)
 
 # The backend doesn't need everything that's in LIBS, however
-LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS))
+LIBS := $(filter-out -lreadline -ledit -ltermcap -lncurses -lcurses, $(LIBS))
 
 ifeq ($(with_systemd),yes)
 LIBS += -lsystemd
diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile
index a8f4757f0c..8ec60ded76 100644
--- a/src/backend/replication/Makefile
+++ b/src/backend/replication/Makefile
@@ -18,6 +18,7 @@ OBJS = \
 	backup_manifest.o \
 	basebackup.o \
 	basebackup_copy.o \
+	basebackup_gzip.o \
 	basebackup_progress.o \
 	basebackup_server.o \
 	basebackup_sink.o \
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 7f37630e6c..ff26537679 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -61,6 +61,12 @@ typedef enum
 	BACKUP_TARGET_SERVER
 } backup_target_type;
 
+typedef enum
+{
+	BACKUP_COMPRESSION_NONE,
+	BACKUP_COMPRESSION_GZIP
+} basebackup_compression_type;
+
 typedef struct
 {
 	const char *label;
@@ -73,6 +79,8 @@ typedef struct
 	backup_target_type target;
 	char	   *target_detail;
 	backup_manifest_option manifest;
+	basebackup_compression_type	compression;
+	int			compression_level;
 	pg_checksum_type manifest_checksum_type;
 } basebackup_options;
 
@@ -707,11 +715,13 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 	bool		o_target = false;
 	bool		o_target_detail = false;
 	char	   *target_str = "compat";	/* placate compiler */
+	bool		o_compression = false;
 
 	MemSet(opt, 0, sizeof(*opt));
 	opt->target = BACKUP_TARGET_COMPAT;
 	opt->manifest = MANIFEST_OPTION_NO;
 	opt->manifest_checksum_type = CHECKSUM_TYPE_CRC32C;
+	opt->compression = BACKUP_COMPRESSION_NONE;
 
 	foreach(lopt, options)
 	{
@@ -881,6 +891,31 @@ parse_basebackup_options(List *options, basebackup_options *opt)
 			opt->target_detail = optval;
 			o_target_detail = true;
 		}
+		else if (strcmp(defel->defname, "compression") == 0)
+		{
+			char	   *optval = defGetString(defel);
+
+			if (o_compression)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("duplicate option \"%s\"", defel->defname)));
+			if (strcmp(optval, "none") == 0)
+				opt->compression = BACKUP_COMPRESSION_NONE;
+			else if (strcmp(optval, "gzip") == 0)
+				opt->compression = BACKUP_COMPRESSION_GZIP;
+			else if (strlen(optval) == 5 && strncmp(optval, "gzip", 4) == 0 &&
+					 optval[4] >= '1' && optval[4] <= '9')
+			{
+				opt->compression = BACKUP_COMPRESSION_GZIP;
+				opt->compression_level = optval[4] - '0';
+			}
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("unrecognized compression algorithm: \"%s\"",
+								optval)));
+			o_compression = true;
+		}
 	}
 	if (opt->label == NULL)
 		opt->label = "base backup";
@@ -975,6 +1010,10 @@ SendBaseBackup(BaseBackupCmd *cmd)
 	if (opt.maxrate > 0)
 		sink = bbsink_throttle_new(sink, opt.maxrate);
 
+	/* Set up server-side compression, if client requested it */
+	if (opt.compression == BACKUP_COMPRESSION_GZIP)
+		sink = bbsink_gzip_new(sink, opt.compression_level);
+
 	/* Set up progress reporting. */
 	sink = bbsink_progress_new(sink, opt.progress);
 
diff --git a/src/backend/replication/basebackup_gzip.c b/src/backend/replication/basebackup_gzip.c
new file mode 100644
index 0000000000..432423bd55
--- /dev/null
+++ b/src/backend/replication/basebackup_gzip.c
@@ -0,0 +1,304 @@
+/*-------------------------------------------------------------------------
+ *
+ * basebackup_gzip.c
+ *	  Basebackup sink implementing gzip compression.
+ *
+ * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/replication/basebackup_gzip.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#ifdef HAVE_LIBZ
+#include <zlib.h>
+#endif
+
+#include "replication/basebackup_sink.h"
+
+#ifdef HAVE_LIBZ
+typedef struct bbsink_gzip
+{
+	/* Common information for all types of sink. */
+	bbsink		base;
+
+	/* Compression level. */
+	int			compresslevel;
+
+	/* Compressed data stream. */
+	z_stream	zstream;
+
+	/* Number of bytes staged in output buffer. */
+	size_t		bytes_written;
+} bbsink_gzip;
+
+static void bbsink_gzip_begin_backup(bbsink *sink);
+static void bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name);
+static void bbsink_gzip_archive_contents(bbsink *sink, size_t len);
+static void bbsink_gzip_manifest_contents(bbsink *sink, size_t len);
+static void bbsink_gzip_end_archive(bbsink *sink);
+static void *gzip_palloc(void *opaque, unsigned items, unsigned size);
+static void gzip_pfree(void *opaque, void *address);
+
+const bbsink_ops bbsink_gzip_ops = {
+	.begin_backup = bbsink_gzip_begin_backup,
+	.begin_archive = bbsink_gzip_begin_archive,
+	.archive_contents = bbsink_gzip_archive_contents,
+	.end_archive = bbsink_gzip_end_archive,
+	.begin_manifest = bbsink_forward_begin_manifest,
+	.manifest_contents = bbsink_gzip_manifest_contents,
+	.end_manifest = bbsink_forward_end_manifest,
+	.end_backup = bbsink_forward_end_backup,
+	.cleanup = bbsink_forward_cleanup
+};
+#endif
+
+/*
+ * Create a new basebackup sink that performs gzip compression using the
+ * designated compression level.
+ */
+bbsink *
+bbsink_gzip_new(bbsink *next, int compresslevel)
+{
+#ifndef HAVE_LIBZ
+	ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("gzip compression is not supported by this build")));
+#else
+	bbsink_gzip *sink;
+
+	Assert(next != NULL);
+	Assert(compresslevel >= 0 && compresslevel <= 9);
+
+	if (compresslevel == 0)
+		compresslevel = Z_DEFAULT_COMPRESSION;
+
+	sink = palloc0(sizeof(bbsink_gzip));
+	*((const bbsink_ops **) &sink->base.bbs_ops) = &bbsink_gzip_ops;
+	sink->base.bbs_next = next;
+	sink->compresslevel = compresslevel;
+
+	return &sink->base;
+#endif
+}
+
+#ifdef HAVE_LIBZ
+
+/*
+ * Begin backup.
+ */
+static void
+bbsink_gzip_begin_backup(bbsink *sink)
+{
+	/*
+	 * We need our own buffer, because we're going to pass different data to
+	 * the next sink than what gets passed to us.
+	 */
+	sink->bbs_buffer = palloc(sink->bbs_buffer_length);
+
+	/*
+	 * Since deflate() doesn't require the output buffer to be of any
+	 * particular size, we can just make it the same size as the input buffer.
+	 */
+	bbsink_begin_backup(sink->bbs_next, sink->bbs_state,
+						sink->bbs_buffer_length);
+}
+
+/*
+ * Prepare to compress the next archive.
+ */
+static void
+bbsink_gzip_begin_archive(bbsink *sink, const char *archive_name)
+{
+	bbsink_gzip *mysink = (bbsink_gzip *) sink;
+	char *gz_archive_name;
+	z_stream *zs = &mysink->zstream;
+
+	/* Initialize compressor object. */
+	memset(zs, 0, sizeof(z_stream));
+	zs->zalloc = gzip_palloc;
+	zs->zfree = gzip_pfree;
+	zs->next_out = (uint8 *) sink->bbs_next->bbs_buffer;
+	zs->avail_out = sink->bbs_next->bbs_buffer_length;
+
+	/*
+	 * We need to use deflateInit2() rather than deflateInit() here so that
+	 * we can request a gzip header rather than a zlib header. Otherwise, we
+	 * want to supply the same values that would have been used by default
+	 * if we had just called deflateInit().
+	 *
+	 * Per the documentation for deflateInit2, the third argument must be
+	 * Z_DEFLATED; the fourth argument is the number of "window bits", by
+	 * default 15, but adding 16 gets you a gzip header rather than a zlib
+	 * header; the fifth argument controls memory usage, and 8 is the default;
+	 * and likewise Z_DEFAULT_STRATEGY is the default for the sixth argument.
+	 */
+	if (deflateInit2(zs, mysink->compresslevel, Z_DEFLATED, 15 + 16, 8,
+					 Z_DEFAULT_STRATEGY) != Z_OK)
+		ereport(ERROR,
+				errcode(ERRCODE_INTERNAL_ERROR),
+				errmsg("could not initialize compression library"));
+
+	/*
+	 * Add ".gz" to the archive name. Note that the pg_basebackup -z
+	 * produces archives named ".tar.gz" rather than ".tgz", so we match
+	 * that here.
+	 */
+	gz_archive_name = psprintf("%s.gz", archive_name);
+	Assert(sink->bbs_next != NULL);
+	bbsink_begin_archive(sink->bbs_next, gz_archive_name);
+	pfree(gz_archive_name);
+}
+
+/*
+ * Compress the input data to the output buffer until we run out of input
+ * data. Each time the output buffer fills up, invoke the archive_contents()
+ * method for then next sink.
+ *
+ * Note that since we're compressing the input, it may very commonly happen
+ * that we consume all the input data without filling the output buffer. In
+ * that case, the compressed representation of the current input data won't
+ * actually be sent to the next bbsink until a later call to this function,
+ * or perhaps even not until bbsink_gzip_end_archive() is invoked.
+ */
+static void
+bbsink_gzip_archive_contents(bbsink *sink, size_t len)
+{
+	bbsink_gzip *mysink = (bbsink_gzip *) sink;
+	z_stream *zs = &mysink->zstream;
+
+	/* Compress data from input buffer. */
+	zs->next_in = (uint8 *) mysink->base.bbs_buffer;
+	zs->avail_in = len;
+
+	while (zs->avail_in > 0)
+	{
+		int		res;
+
+		/* Write output data into unused portion of output buffer. */
+		Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
+		zs->next_out = (uint8 *)
+			mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
+		zs->avail_out =
+			mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
+
+		/*
+		 * Try to compress. Note that this will update zs->next_in and
+		 * zs->avail_in according to how much input data was consumed, and
+		 * zs->next_out and zs->avail_out according to how many output bytes
+		 * were produced.
+		 *
+		 * According to the zlib documentation, Z_STREAM_ERROR should only
+		 * occur if we've made a programming error, or if say there's been a
+		 * memory clobber; we use elog() rather than Assert() here out of an
+		 * abundance of caution.
+		 */
+		res = deflate(zs, Z_NO_FLUSH);
+		if (res == Z_STREAM_ERROR)
+			elog(ERROR, "could not compress data: %s", zs->msg);
+
+		/* Update our notion of how many bytes we've written. */
+		mysink->bytes_written =
+			mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
+
+		/*
+		 * If the output buffer is full, it's time for the next sink to
+		 * process the contents.
+		 */
+		if (mysink->bytes_written >= mysink->base.bbs_next->bbs_buffer_length)
+		{
+			bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
+			mysink->bytes_written = 0;
+		}
+	}
+}
+
+/*
+ * There might be some data inside zlib's internal buffers; we need to get
+ * that flushed out and forwarded to the successor sink as archive content.
+ *
+ * Then we can end processing for this archive.
+ */
+static void
+bbsink_gzip_end_archive(bbsink *sink)
+{
+	bbsink_gzip *mysink = (bbsink_gzip *) sink;
+	z_stream *zs = &mysink->zstream;
+
+	/* There is no more data available. */
+	zs->next_in = (uint8 *) mysink->base.bbs_buffer;
+	zs->avail_in = 0;
+
+	while (1)
+	{
+		int		res;
+
+		/* Write output data into unused portion of output buffer. */
+		Assert(mysink->bytes_written < mysink->base.bbs_next->bbs_buffer_length);
+		zs->next_out = (uint8 *)
+			mysink->base.bbs_next->bbs_buffer + mysink->bytes_written;
+		zs->avail_out =
+			mysink->base.bbs_next->bbs_buffer_length - mysink->bytes_written;
+
+		/*
+		 * As bbsink_gzip_archive_contents, but pass Z_FINISH since there
+		 * is no more input.
+		 */
+		res = deflate(zs, Z_FINISH);
+		if (res == Z_STREAM_ERROR)
+			elog(ERROR, "could not compress data: %s", zs->msg);
+
+		/* Update our notion of how many bytes we've written. */
+		mysink->bytes_written =
+			mysink->base.bbs_next->bbs_buffer_length - zs->avail_out;
+
+		/*
+		 * Apparently we had no data in the output buffer and deflate()
+		 * was not able to add any. We must be done.
+		 */
+		if (mysink->bytes_written == 0)
+			break;
+
+		/* Send whatever accumulated output bytes we have. */
+		bbsink_archive_contents(sink->bbs_next, mysink->bytes_written);
+		mysink->bytes_written = 0;
+	}
+
+	/* Must also pass on the information that this archive has ended. */
+	bbsink_forward_end_archive(sink);
+}
+
+/*
+ * Manifest contents are not compressed, but we do need to copy them into
+ * the successor sink's buffer, because we have our own.
+ */
+static void
+bbsink_gzip_manifest_contents(bbsink *sink, size_t len)
+{
+	memcpy(sink->bbs_next->bbs_buffer, sink->bbs_buffer, len);
+	bbsink_manifest_contents(sink->bbs_next, len);
+}
+
+/*
+ * Wrapper function to adjust the signature of palloc to match what libz
+ * expects.
+ */
+static void *
+gzip_palloc(void *opaque, unsigned items, unsigned size)
+{
+	return palloc(items * size);
+}
+
+/*
+ * Wrapper function to adjust the signature of pfree to match what libz
+ * expects.
+ */
+static void
+gzip_pfree(void *opaque, void *address)
+{
+	pfree(address);
+}
+
+#endif
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 4c9498c368..b76e00818f 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -139,6 +139,7 @@ static bool verify_checksums = true;
 static bool manifest = true;
 static bool manifest_force_encode = false;
 static char *manifest_checksums = NULL;
+static char *server_compression = NULL;
 
 static bool success = false;
 static bool made_new_pgdata = false;
@@ -373,13 +374,15 @@ usage(void)
 			 "                         (in kB/s, or use suffix \"k\" or \"M\")\n"));
 	printf(_("  -R, --write-recovery-conf\n"
 			 "                         write configuration for replication\n"));
+	printf(_("      --server-compression=none|gzip|gzip[1-9]\n"
+			 "                         compress backup on server\n"));
 	printf(_("  -T, --tablespace-mapping=OLDDIR=NEWDIR\n"
 			 "                         relocate tablespace in OLDDIR to NEWDIR\n"));
 	printf(_("      --waldir=WALDIR    location for the write-ahead log directory\n"));
 	printf(_("  -X, --wal-method=none|fetch|stream\n"
 			 "                         include required WAL files with specified method\n"));
-	printf(_("  -z, --gzip             compress tar output\n"));
-	printf(_("  -Z, --compress=0-9     compress tar output with given compression level\n"));
+	printf(_("  -z, --gzip             compress tar output on client\n"));
+	printf(_("  -Z, --compress=0-9     compress tar output on client with given compression level\n"));
 	printf(_("\nGeneral options:\n"));
 	printf(_("  -c, --checkpoint=fast|spread\n"
 			 "                         set fast or spread checkpointing\n"));
@@ -998,7 +1001,9 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	bbstreamer *streamer;
 	bbstreamer *manifest_inject_streamer = NULL;
 	bool		inject_manifest;
+	bool		is_tar;
 	bool		must_parse_archive;
+	int			archive_name_len = strlen(archive_name);
 
 	/*
 	 * Normally, we emit the backup manifest as a separate file, but when
@@ -1007,13 +1012,32 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 	 */
 	inject_manifest = (format == 't' && strcmp(basedir, "-") == 0 && manifest);
 
+	/* Is this a tar archive? */
+	is_tar = (archive_name_len > 4 &&
+			  strcmp(archive_name + archive_name_len - 4, ".tar") == 0);
+
 	/*
 	 * We have to parse the archive if (1) we're suppose to extract it, or if
 	 * (2) we need to inject backup_manifest or recovery configuration into it.
+	 * However, we only know how to parse tar archives.
 	 */
 	must_parse_archive = (format == 'p' || inject_manifest ||
 		(spclocation == NULL && writerecoveryconf));
 
+	/* At present, we only know how to parse tar archives. */
+	if (must_parse_archive && !is_tar)
+	{
+		pg_log_error("unable to parse archive: %s", archive_name);
+		pg_log_info("only tar archives can be parsed");
+		if (format == 'p')
+			pg_log_info("plain format requires pg_basebackup to parse the archive");
+		if (inject_manifest)
+			pg_log_info("using - as the output directory requires pg_basebackup to parse the archive");
+		if (writerecoveryconf)
+			pg_log_info("the -R option requires pg_basebackup to parse the archive");
+		exit(1);
+	}
+
 	if (format == 'p')
 	{
 		const char *directory;
@@ -1753,6 +1777,17 @@ BaseBackup(void)
 		AppendStringCommandOption(&buf, use_new_option_syntax,
 								  "TARGET", "client");
 
+	if (server_compression != NULL)
+	{
+		if (!use_new_option_syntax)
+		{
+			pg_log_error("server does not support server-side compression");
+			exit(1);
+		}
+		AppendStringCommandOption(&buf, use_new_option_syntax,
+									 "COMPRESSION", server_compression);
+	}
+
 	if (verbose)
 		pg_log_info("initiating base backup, waiting for checkpoint to complete");
 
@@ -2163,6 +2198,7 @@ main(int argc, char **argv)
 		{"no-manifest", no_argument, NULL, 5},
 		{"manifest-force-encode", no_argument, NULL, 6},
 		{"manifest-checksums", required_argument, NULL, 7},
+		{"server-compression", required_argument, NULL, 8},
 		{NULL, 0, NULL, 0}
 	};
 	int			c;
@@ -2342,6 +2378,9 @@ main(int argc, char **argv)
 			case 7:
 				manifest_checksums = pg_strdup(optarg);
 				break;
+			case 8:
+				server_compression = pg_strdup(optarg);
+				break;
 			default:
 
 				/*
diff --git a/src/include/replication/basebackup_sink.h b/src/include/replication/basebackup_sink.h
index 0e337a86f4..6bfea35c22 100644
--- a/src/include/replication/basebackup_sink.h
+++ b/src/include/replication/basebackup_sink.h
@@ -284,6 +284,7 @@ extern void bbsink_forward_cleanup(bbsink *sink);
 /* Constructors for various types of sinks. */
 extern bbsink *bbsink_copystream_new(bool send_to_client);
 extern bbsink *bbsink_copytblspc_new(void);
+extern bbsink *bbsink_gzip_new(bbsink *next, int compresslevel);
 extern bbsink *bbsink_progress_new(bbsink *next, bool estimate_backup_size);
 extern bbsink *bbsink_server_new(bbsink *next, char *pathname);
 extern bbsink *bbsink_throttle_new(bbsink *next, uint32 maxrate);
-- 
2.24.3 (Apple Git-128)

