From 857f48859aa8ebbe6daa5b80e2f51bfb96e3979c Mon Sep 17 00:00:00 2001
From: Georgios Kokolatos <gkokolatos@pm.me>
Date: Tue, 29 Jun 2021 14:27:51 +0000
Subject: Teach pg_receivewal to use lz4 compression

---
 src/bin/pg_basebackup/pg_basebackup.c        |   7 +-
 src/bin/pg_basebackup/pg_receivewal.c        |  68 ++++++-
 src/bin/pg_basebackup/t/020_pg_receivewal.pl |  38 +++-
 src/bin/pg_basebackup/walmethods.c           | 175 +++++++++++++++++--
 src/bin/pg_basebackup/walmethods.h           |  12 +-
 5 files changed, 278 insertions(+), 22 deletions(-)

diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 16d8929b23..6b8734d8ba 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -553,10 +553,13 @@ LogStreamerMain(logstreamer_param *param)
 	stream.replication_slot = replication_slot;
 
 	if (format == 'p')
-		stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0,
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog,
+													COMPRESSION_NONE, 0,
 													stream.do_sync);
 	else
-		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel,
+		stream.walmethod = CreateWalTarMethod(param->xlog,
+											  COMPRESSION_NONE /* argument is ignored */,
+											  compresslevel,
 											  stream.do_sync);
 
 	if (!ReceiveXlogStream(param->bgconn, &stream))
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 0d15012c29..6759e3e747 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -43,6 +43,7 @@ static bool do_drop_slot = false;
 static bool do_sync = true;
 static bool synchronous = false;
 static char *replication_slot = NULL;
+static WalCompressionProgram compression_program =	COMPRESSION_NONE;
 static XLogRecPtr endpos = InvalidXLogRecPtr;
 
 
@@ -90,7 +91,8 @@ usage(void)
 	printf(_("      --synchronous      flush write-ahead log immediately after writing\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
-	printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
+	printf(_("  -I, --compress-program use this program for compression\n"));
+	printf(_("  -Z, --compress=0-9     compress logs with given compression level (available only with compress-program=zlib)\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -429,7 +431,9 @@ StreamLog(void)
 	stream.synchronous = synchronous;
 	stream.do_sync = do_sync;
 	stream.mark_done = false;
-	stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+	stream.walmethod = CreateWalDirectoryMethod(basedir,
+												compression_program,
+												compresslevel,
 												stream.do_sync);
 	stream.partial_suffix = ".partial";
 	stream.replication_slot = replication_slot;
@@ -482,6 +486,7 @@ main(int argc, char **argv)
 		{"status-interval", required_argument, NULL, 's'},
 		{"slot", required_argument, NULL, 'S'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"compress-program", required_argument, NULL, 'I'},
 		{"compress", required_argument, NULL, 'Z'},
 /* action */
 		{"create-slot", no_argument, NULL, 1},
@@ -573,6 +578,21 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 'I':
+				if (strcmp(optarg, "zlib") == 0)
+				{
+					compression_program = COMPRESSION_ZLIB;
+				}
+				else if (strcmp(optarg, "lz4") == 0)
+				{
+					compression_program = COMPRESSION_LZ4;
+				}
+				else
+				{
+					pg_log_error("invalid compress-program \"%s\"", optarg);
+					exit(1);
+				}
+				break;
 			case 'Z':
 				compresslevel = atoi(optarg);
 				if (compresslevel < 0 || compresslevel > 9)
@@ -657,14 +677,56 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (compression_program != COMPRESSION_NONE)
+	{
+#ifndef HAVE_LIBZ
+		if (compression_program == COMPRESSION_ZLIB)
+		{
+			pg_log_error("this build does not support compression via zlib");
+			exit(1);
+		}
+#endif
+#ifndef HAVE_LIBLZ4
+		if (compression_program == COMPRESSION_LZ4)
+		{
+			pg_log_error("this build does not support compression via lz4");
+			exit(1);
+		}
+#endif
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel != 0)
 	{
-		pg_log_error("this build does not support compression");
+		pg_log_error("this build does not support compression via zlib");
 		exit(1);
 	}
 #endif
 
+	if (compresslevel != 0)
+	{
+		if (compression_program == COMPRESSION_NONE)
+		{
+			compression_program = COMPRESSION_ZLIB;
+		}
+		if (compression_program != COMPRESSION_ZLIB)
+		{
+			pg_log_error("cannot use --compress when "
+						 "--compress_program is not zlib");
+			fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+					progname);
+			exit(1);
+		}
+	}
+	else if (compression_program == COMPRESSION_ZLIB)
+	{
+		pg_log_error("cannot use --compress_program zlib when "
+					 "--compression is 0");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 	/*
 	 * Check existence of destination folder.
 	 */
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index a547c97ef1..0e27cf030c 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 19;
+use Test::More tests => 22;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -33,6 +33,13 @@ $primary->command_fails(
 $primary->command_fails(
 	[ 'pg_receivewal', '-D', $stream_dir, '--synchronous', '--no-sync' ],
 	'failure if --synchronous specified with --no-sync');
+$primary->command_fails(
+	[
+	  'pg_receivewal', '-D', $stream_dir, '--compress_program', 'lz4',
+	  '--compress', '0'
+	],
+	'failure if --compress_program=lz4 specified with --compress');
+
 
 # Slot creation and drop
 my $slot_name = 'test';
@@ -66,6 +73,35 @@ $primary->command_ok(
 	],
 	'streaming some WAL with --synchronous');
 
+# Check lz4 compression if available
+SKIP:
+{
+	skip "postgres was not build with LZ4 support", 2
+		if (!check_pg_config("#define HAVE_LIBLZ4 1"));
+
+	# Generate some WAL.
+	$primary->psql('postgres', 'SELECT pg_switch_wal();');
+	$nextlsn =
+	  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+	chomp($nextlsn);
+	$primary->psql('postgres',
+		'INSERT INTO test_table VALUES (generate_series(100,200));');
+	$primary->psql('postgres', 'SELECT pg_switch_wal();');
+
+	# Stream up to the given position
+	$primary->command_ok(
+		[
+			'pg_receivewal', '-D',     $stream_dir,     '--verbose',
+			'--endpos',      $nextlsn, '--compress-program=lz4'
+		],
+		'streaming some WAL with --compress-program=lz4');
+
+	# Verify that the stored file is compressed and readable
+	my @lz4_wals = glob "$stream_dir/*.lz4";
+	is(scalar(@lz4_wals), 1, 'one lz4 compressed WAL was created');
+	system_or_bail('lz4', '-t', $lz4_wals[0]);
+}
+
 # Permissions on WAL files should be default
 SKIP:
 {
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index a15bbb20e7..18d5bf3e59 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -17,6 +17,10 @@
 #include <sys/stat.h>
 #include <time.h>
 #include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
 #ifdef HAVE_LIBZ
 #include <zlib.h>
 #endif
@@ -30,6 +34,9 @@
 /* Size of zlib buffer for .tar.gz */
 #define ZLIB_OUT_SIZE 4096
 
+/* Size of lz4 input chunk for .lz4 */
+#define LZ4_IN_SIZE  4096
+
 /*-------------------------------------------------------------------------
  * WalDirectoryMethod - write wal to a directory looking like pg_wal
  *-------------------------------------------------------------------------
@@ -40,9 +47,10 @@
  */
 typedef struct DirectoryMethodData
 {
-	char	   *basedir;
-	int			compression;
-	bool		sync;
+	char				   *basedir;
+	WalCompressionProgram	compression_program;
+	int						compression;
+	bool					sync;
 } DirectoryMethodData;
 static DirectoryMethodData *dir_data = NULL;
 
@@ -59,6 +67,11 @@ typedef struct DirectoryMethodFile
 #ifdef HAVE_LIBZ
 	gzFile		gzfp;
 #endif
+#ifdef HAVE_LIBLZ4
+	LZ4F_compressionContext_t ctx;
+	size_t		outbufCapacity;
+	void	   *outbuf;
+#endif
 } DirectoryMethodFile;
 
 static const char *
@@ -77,10 +90,16 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 #ifdef HAVE_LIBZ
 	gzFile		gzfp = NULL;
 #endif
+#ifdef HAVE_LIBLZ4
+	LZ4F_compressionContext_t ctx = NULL;
+	size_t		outbufCapacity;
+	void	   *outbuf = NULL;
+#endif
 
 	snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
 			 dir_data->basedir, pathname,
-			 dir_data->compression > 0 ? ".gz" : "",
+			 dir_data->compression_program == COMPRESSION_ZLIB ? ".gz" :
+			 dir_data->compression_program == COMPRESSION_LZ4  ? ".lz4": "",
 			 temp_suffix ? temp_suffix : "");
 
 	/*
@@ -94,7 +113,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		return NULL;
 
 #ifdef HAVE_LIBZ
-	if (dir_data->compression > 0)
+	if (dir_data->compression_program == COMPRESSION_ZLIB)
 	{
 		gzfp = gzdopen(fd, "wb");
 		if (gzfp == NULL)
@@ -111,9 +130,48 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		}
 	}
 #endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_program == COMPRESSION_LZ4)
+	{
+		size_t		ctx_out;
+		size_t		header_size;
+
+		ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
+		outbufCapacity = LZ4F_compressBound(LZ4_IN_SIZE, NULL /* default preferences */);
+		if (LZ4F_isError(ctx_out))
+		{
+			close(fd);
+			return NULL;
+		}
+
+		outbuf = pg_malloc0(outbufCapacity);
+
+		/* add the header */
+		header_size = LZ4F_compressBegin(ctx, outbuf, outbufCapacity, NULL);
+		if (LZ4F_isError(header_size))
+		{
+			close(fd);
+			return NULL;
+		}
+
+		errno = 0;
+		if (write(fd, outbuf, header_size) != header_size)
+		{
+			int			save_errno = errno;
+
+			close(fd);
+
+			/*
+			 * If write didn't set errno, assume problem is no disk space.
+			 */
+			errno = save_errno ? save_errno : ENOSPC;
+			return NULL;
+		}
+	}
+#endif
 
 	/* Do pre-padding on non-compressed files */
-	if (pad_to_size && dir_data->compression == 0)
+	if (pad_to_size && dir_data->compression_program == COMPRESSION_NONE)
 	{
 		PGAlignedXLogBlock zerobuf;
 		int			bytes;
@@ -158,7 +216,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 			fsync_parent_path(tmppath) != 0)
 		{
 #ifdef HAVE_LIBZ
-			if (dir_data->compression > 0)
+			if (dir_data->compression_program == COMPRESSION_ZLIB)
 				gzclose(gzfp);
 			else
 #endif
@@ -169,9 +227,18 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 
 	f = pg_malloc0(sizeof(DirectoryMethodFile));
 #ifdef HAVE_LIBZ
-	if (dir_data->compression > 0)
+	if (dir_data->compression_program == COMPRESSION_ZLIB)
 		f->gzfp = gzfp;
 #endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_program == COMPRESSION_LZ4)
+	{
+		f->ctx = ctx;
+		f->outbuf = outbuf;
+		f->outbufCapacity = outbufCapacity;
+	}
+#endif
+
 	f->fd = fd;
 	f->currpos = 0;
 	f->pathname = pg_strdup(pathname);
@@ -191,9 +258,46 @@ dir_write(Walfile f, const void *buf, size_t count)
 	Assert(f != NULL);
 
 #ifdef HAVE_LIBZ
-	if (dir_data->compression > 0)
+	if (dir_data->compression_program == COMPRESSION_ZLIB)
 		r = (ssize_t) gzwrite(df->gzfp, buf, count);
 	else
+#endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_program == COMPRESSION_LZ4)
+	{
+		size_t		chunk;
+		size_t		remaining;
+		const void *inbuf = buf;
+
+		remaining = count;
+		while (remaining > 0)
+		{
+			size_t compressed;
+
+			if (remaining > LZ4_IN_SIZE)
+				chunk = LZ4_IN_SIZE;
+			else
+				chunk = remaining;
+
+			remaining -= chunk;
+			compressed = LZ4F_compressUpdate(df->ctx,
+											 df->outbuf, df->outbufCapacity,
+											 inbuf, chunk,
+											 NULL);
+
+			if (LZ4F_isError(compressed))
+				return -1;
+
+			if (write(df->fd, df->outbuf, compressed) != compressed)
+				return -1;
+
+			inbuf = ((char *)inbuf) + chunk;
+		}
+
+		/* XXX: This is what our caller expects, but it is not nice at all */
+		r = (ssize_t)count;
+	}
+	else
 #endif
 		r = write(df->fd, buf, count);
 	if (r > 0)
@@ -221,9 +325,30 @@ dir_close(Walfile f, WalCloseMethod method)
 	Assert(f != NULL);
 
 #ifdef HAVE_LIBZ
-	if (dir_data->compression > 0)
+	if (dir_data->compression_program == COMPRESSION_ZLIB)
 		r = gzclose(df->gzfp);
 	else
+#endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_program == COMPRESSION_LZ4)
+	{
+		/* Flush any internal buffers */
+		size_t compressed = LZ4F_compressEnd(df->ctx,
+											df->outbuf, df->outbufCapacity,
+											NULL);
+		if (LZ4F_isError(compressed))
+		{
+			return -1;
+		}
+
+		if (write(df->fd, df->outbuf, compressed) != compressed)
+		{
+			return -1;
+		}
+
+		r = close(df->fd);
+	}
+	else
 #endif
 		r = close(df->fd);
 
@@ -238,11 +363,13 @@ dir_close(Walfile f, WalCloseMethod method)
 			 */
 			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
 					 dir_data->basedir, df->pathname,
-					 dir_data->compression > 0 ? ".gz" : "",
+					 dir_data->compression_program == COMPRESSION_ZLIB ? ".gz" :
+					 dir_data->compression_program == COMPRESSION_LZ4  ? ".lz4": "",
 					 df->temp_suffix);
 			snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
 					 dir_data->basedir, df->pathname,
-					 dir_data->compression > 0 ? ".gz" : "");
+					 dir_data->compression_program == COMPRESSION_ZLIB ? ".gz" :
+					 dir_data->compression_program == COMPRESSION_LZ4  ? ".lz4": "");
 			r = durable_rename(tmppath, tmppath2);
 		}
 		else if (method == CLOSE_UNLINK)
@@ -250,7 +377,8 @@ dir_close(Walfile f, WalCloseMethod method)
 			/* Unlink the file once it's closed */
 			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
 					 dir_data->basedir, df->pathname,
-					 dir_data->compression > 0 ? ".gz" : "",
+					 dir_data->compression_program == COMPRESSION_ZLIB ? ".gz" :
+					 dir_data->compression_program == COMPRESSION_LZ4  ? ".lz4": "",
 					 df->temp_suffix ? df->temp_suffix : "");
 			r = unlink(tmppath);
 		}
@@ -270,6 +398,12 @@ dir_close(Walfile f, WalCloseMethod method)
 		}
 	}
 
+#ifdef HAVE_LIBLZ4
+	pg_free(df->outbuf);
+	/* supports free on NULL */
+	LZ4F_freeCompressionContext(df->ctx);
+#endif
+
 	pg_free(df->pathname);
 	pg_free(df->fullpath);
 	if (df->temp_suffix)
@@ -346,7 +480,9 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
+CreateWalDirectoryMethod(const char *basedir,
+						WalCompressionProgram compression_program,
+						int compression, bool sync)
 {
 	WalWriteMethod *method;
 
@@ -362,6 +498,7 @@ CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
 	method->getlasterror = dir_getlasterror;
 
 	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->compression_program = compression_program;
 	dir_data->compression = compression;
 	dir_data->basedir = pg_strdup(basedir);
 	dir_data->sync = sync;
@@ -983,8 +1120,16 @@ tar_finish(void)
 	return true;
 }
 
+/*
+ * The argument compression_program is currently ignored. It is in place for
+ * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
+ * between the different compression methods. CreateWalTarMethod and its family
+ * of functions handle only zlib compression.
+ */
 WalWriteMethod *
-CreateWalTarMethod(const char *tarbase, int compression, bool sync)
+CreateWalTarMethod(const char *tarbase,
+				   WalCompressionProgram compression_program,
+				   int compression, bool sync)
 {
 	WalWriteMethod *method;
 	const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index fc4bb52cb7..f7d8582aad 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -19,6 +19,13 @@ typedef enum
 	CLOSE_NO_RENAME
 } WalCloseMethod;
 
+typedef enum
+{
+	COMPRESSION_LZ4,
+	COMPRESSION_ZLIB,
+	COMPRESSION_NONE
+} WalCompressionProgram;
+
 /*
  * A WalWriteMethod structure represents the different methods used
  * to write the streaming WAL as it's received.
@@ -86,8 +93,11 @@ struct WalWriteMethod
  *						   not all those required for pg_receivewal)
  */
 WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+										 WalCompressionProgram compression_program,
 										 int compression, bool sync);
-WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
+WalWriteMethod *CreateWalTarMethod(const char *tarbase,
+								  WalCompressionProgram compression_program,
+								  int compression, bool sync);
 
 /* Cleanup routines for previously-created methods */
 void		FreeWalDirectoryMethod(void);
-- 
2.25.1

