From 8f44cb1723fb6ca749ac5d107152b95e2319eda8 Mon Sep 17 00:00:00 2001
From: Georgios Kokolatos <gkokolatos@pm.me>
Date: Thu, 8 Jul 2021 13:47:37 +0000
Subject: [PATCH v2] Teach pg_receivewal to use lz4 compression

The program pg_receivewal can use gzip compression to store the received WAL.
This commit teaches it to also be able to use lz4 compression. It is required
that the binary is build using the -llz4 flag. It is enabled via the --with-lz4
flag on configuration time.

Previously, the user had to use the option --compress with a value between [0-9]
to denote that gzip compression was required. This specific behaviour is
maintained. A newly introduced option --compression-method=lz4 can be used to ask
for the logs to be compressed with lz4. In that case, no compression values can
be selected as it did not seem too useful.

Under the hood there is nothing exceptional to be noted. Tar based archives have
not yet been taught to use lz4 compression. If that is felt useful, then it is
easy to be added in the future.

Tests have been added to verify the creation and correctness of the generated
lz4 files. The later is achieved by the use of lz4 program. Autoconf has been
taught to unconditionally recognize the existance of the program and propagate
the information to the tests.
---
 configure                                    |  55 ++++++
 configure.ac                                 |   2 +
 doc/src/sgml/ref/pg_receivewal.sgml          |  28 ++-
 src/Makefile.global.in                       |   1 +
 src/bin/pg_basebackup/Makefile               |   3 +-
 src/bin/pg_basebackup/pg_basebackup.c        |   7 +-
 src/bin/pg_basebackup/pg_receivewal.c        | 196 +++++++++++++++++--
 src/bin/pg_basebackup/t/020_pg_receivewal.pl |  46 ++++-
 src/bin/pg_basebackup/walmethods.c           | 188 ++++++++++++++++--
 src/bin/pg_basebackup/walmethods.h           |  12 +-
 10 files changed, 495 insertions(+), 43 deletions(-)

diff --git a/configure b/configure
index e468def49e..4fd6652d1c 100755
--- a/configure
+++ b/configure
@@ -699,6 +699,7 @@ with_gnu_ld
 LD
 LDFLAGS_SL
 LDFLAGS_EX
+LZ4
 LZ4_LIBS
 LZ4_CFLAGS
 with_lz4
@@ -9563,6 +9564,60 @@ $as_echo_n "checking for TAR... " >&6; }
 $as_echo "$TAR" >&6; }
 fi
 
+if test -z "$LZ4"; then
+  for ac_prog in lz4
+do
+  # Extract the first word of "$ac_prog", so it can be a program name with args.
+set dummy $ac_prog; ac_word=$2
+{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for $ac_word" >&5
+$as_echo_n "checking for $ac_word... " >&6; }
+if ${ac_cv_path_LZ4+:} false; then :
+  $as_echo_n "(cached) " >&6
+else
+  case $LZ4 in
+  [\\/]* | ?:[\\/]*)
+  ac_cv_path_LZ4="$LZ4" # Let the user override the test with a path.
+  ;;
+  *)
+  as_save_IFS=$IFS; IFS=$PATH_SEPARATOR
+for as_dir in $PATH
+do
+  IFS=$as_save_IFS
+  test -z "$as_dir" && as_dir=.
+    for ac_exec_ext in '' $ac_executable_extensions; do
+  if as_fn_executable_p "$as_dir/$ac_word$ac_exec_ext"; then
+    ac_cv_path_LZ4="$as_dir/$ac_word$ac_exec_ext"
+    $as_echo "$as_me:${as_lineno-$LINENO}: found $as_dir/$ac_word$ac_exec_ext" >&5
+    break 2
+  fi
+done
+  done
+IFS=$as_save_IFS
+
+  ;;
+esac
+fi
+LZ4=$ac_cv_path_LZ4
+if test -n "$LZ4"; then
+  { $as_echo "$as_me:${as_lineno-$LINENO}: result: $LZ4" >&5
+$as_echo "$LZ4" >&6; }
+else
+  { $as_echo "$as_me:${as_lineno-$LINENO}: result: no" >&5
+$as_echo "no" >&6; }
+fi
+
+
+  test -n "$LZ4" && break
+done
+
+else
+  # Report the value of LZ4 in configure's output in all cases.
+  { $as_echo "$as_me:${as_lineno-$LINENO}: checking for lz4" >&5
+$as_echo_n "checking for lz4... " >&6; }
+  { $as_echo "$as_me:${as_lineno-$LINENO}: result: $LZ4" >&5
+$as_echo "$LZ4" >&6; }
+fi
+
 { $as_echo "$as_me:${as_lineno-$LINENO}: checking whether ln -s works" >&5
 $as_echo_n "checking whether ln -s works... " >&6; }
 LN_S=$as_ln_s
diff --git a/configure.ac b/configure.ac
index 39666f9727..d6db7d1b80 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1053,6 +1053,8 @@ case $MKDIR_P in
   *install-sh*) MKDIR_P='\${SHELL} \${top_srcdir}/config/install-sh -c -d';;
 esac
 
+PGAC_PATH_PROGS(LZ4, lz4)
+
 PGAC_PATH_BISON
 PGAC_PATH_FLEX
 
diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 45b544cf49..1b49884247 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -229,15 +229,35 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--compression-method=<replaceable class="parameter">level</replaceable></option></term>
+      <listitem>
+       <para>
+        Enables compression of write-ahead logs using the specified method.
+        Supported methods are <literal>lz4</literal> and
+        <literal>gzip</literal>.
+        The suffix <filename>.lz4</filename> or <filename>.gz</filename> will
+        automatically be added to all filenames for each method respectevilly.
+        For the <literal>lz4</literal> method to be available,
+        <productname>PostgreSQL</productname> must have been have been compiled
+        with <option>--with-lz4</option>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry>
       <term><option>-Z <replaceable class="parameter">level</replaceable></option></term>
       <term><option>--compress=<replaceable class="parameter">level</replaceable></option></term>
       <listitem>
        <para>
-        Enables gzip compression of write-ahead logs, and specifies the
-        compression level (0 through 9, 0 being no compression and 9 being best
-        compression).  The suffix <filename>.gz</filename> will
-        automatically be added to all filenames.
+        Specifies the compression level (0 through 9, 0 being no compression and
+        9 being best compression).  If no <option>--compression-method</option>
+        is specified, it implies <literal>gzip</literal> compression method.
+       </para>
+
+       <para>
+        This option is not available when <option>--compression-method</option>
+        is specified as <literal>lz4</literal>.  
        </para>
       </listitem>
      </varlistentry>
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 6e2f224cc4..91ba2240a2 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -341,6 +341,7 @@ perl_embed_ldflags	= @perl_embed_ldflags@
 
 AWK	= @AWK@
 LN_S	= @LN_S@
+LZ4		= @LZ4@
 MSGFMT  = @MSGFMT@
 MSGFMT_FLAGS = @MSGFMT_FLAGS@
 MSGMERGE = @MSGMERGE@
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index 66e0070f1a..7950d2843e 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -18,8 +18,9 @@ subdir = src/bin/pg_basebackup
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-# make this available to TAP test scripts
+# make these available to TAP test scripts
 export TAR
+export LZ4
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
 LDFLAGS_INTERNAL += -L$(top_builddir)/src/fe_utils -lpgfeutils $(libpq_pgport)
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 8bb0acf498..8cc73718a4 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -553,10 +553,13 @@ LogStreamerMain(logstreamer_param *param)
 	stream.replication_slot = replication_slot;
 
 	if (format == 'p')
-		stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0,
+		stream.walmethod = CreateWalDirectoryMethod(param->xlog,
+													COMPRESSION_NONE, 0,
 													stream.do_sync);
 	else
-		stream.walmethod = CreateWalTarMethod(param->xlog, compresslevel,
+		stream.walmethod = CreateWalTarMethod(param->xlog,
+											  COMPRESSION_NONE /* argument is ignored */,
+											  compresslevel,
 											  stream.do_sync);
 
 	if (!ReceiveXlogStream(param->bgconn, &stream))
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index c1334fad35..673b34df51 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -27,6 +27,10 @@
 #include "receivelog.h"
 #include "streamutil.h"
 
+#ifdef HAVE_LIBLZ4
+#include "lz4frame.h"
+#endif
+
 /* Time to sleep between reconnection attempts */
 #define RECONNECT_SLEEP_TIME 5
 
@@ -43,6 +47,7 @@ static bool do_drop_slot = false;
 static bool do_sync = true;
 static bool synchronous = false;
 static char *replication_slot = NULL;
+static WalCompressionMethod compression_method =	COMPRESSION_NONE;
 static XLogRecPtr endpos = InvalidXLogRecPtr;
 
 
@@ -62,14 +67,22 @@ disconnect_atexit(void)
 }
 
 /* Routines to evaluate segment file format */
-#define IsCompressXLogFileName(fname)	 \
+#define IsZlibCompressXLogFileName(fname)	 \
 	(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \
 	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
 	 strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
-#define IsPartialCompressXLogFileName(fname)	\
+#define IsZlibPartialCompressXLogFileName(fname)	\
 	(strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \
 	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
 	 strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
+#define IsLZ4CompressXLogFileName(fname)	 \
+	(strlen(fname) == XLOG_FNAME_LEN + strlen(".lz4") && \
+	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
+	 strcmp((fname) + XLOG_FNAME_LEN, ".lz4") == 0)
+#define IsLZ4PartialCompressXLogFileName(fname)	\
+	(strlen(fname) == XLOG_FNAME_LEN + strlen(".lz4.partial") && \
+	 strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&		\
+	 strcmp((fname) + XLOG_FNAME_LEN, ".lz4.partial") == 0)
 
 static void
 usage(void)
@@ -90,7 +103,10 @@ usage(void)
 	printf(_("      --synchronous      flush write-ahead log immediately after writing\n"));
 	printf(_("  -v, --verbose          output verbose messages\n"));
 	printf(_("  -V, --version          output version information, then exit\n"));
-	printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
+	printf(_("      --compression-method=METHOD\n"
+			 "                         use this method for compression\n"));
+	printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"
+			 "                         (available only with --compression-method=gzip)\n"));
 	printf(_("  -?, --help             show this help, then exit\n"));
 	printf(_("\nConnection options:\n"));
 	printf(_("  -d, --dbname=CONNSTR   connection string\n"));
@@ -212,7 +228,8 @@ FindStreamingStart(uint32 *tli)
 		uint32		tli;
 		XLogSegNo	segno;
 		bool		ispartial;
-		bool		iscompress;
+		bool		iszlibcompress;
+		bool		islz4compress;
 
 		/*
 		 * Check if the filename looks like an xlog file, or a .partial file.
@@ -220,22 +237,38 @@ FindStreamingStart(uint32 *tli)
 		if (IsXLogFileName(dirent->d_name))
 		{
 			ispartial = false;
-			iscompress = false;
+			iszlibcompress = false;
+			islz4compress = false;
 		}
 		else if (IsPartialXLogFileName(dirent->d_name))
 		{
 			ispartial = true;
-			iscompress = false;
+			iszlibcompress = false;
+			islz4compress = false;
 		}
-		else if (IsCompressXLogFileName(dirent->d_name))
+		else if (IsZlibCompressXLogFileName(dirent->d_name))
 		{
 			ispartial = false;
-			iscompress = true;
+			iszlibcompress = true;
+			islz4compress = false;
 		}
-		else if (IsPartialCompressXLogFileName(dirent->d_name))
+		else if (IsZlibPartialCompressXLogFileName(dirent->d_name))
 		{
 			ispartial = true;
-			iscompress = true;
+			iszlibcompress = true;
+			islz4compress = false;
+		}
+		else if (IsLZ4CompressXLogFileName(dirent->d_name))
+		{
+			ispartial = false;
+			islz4compress = true;
+			iszlibcompress = false;
+		}
+		else if (IsLZ4PartialCompressXLogFileName(dirent->d_name))
+		{
+			ispartial = true;
+			islz4compress = true;
+			iszlibcompress = false;
 		}
 		else
 			continue;
@@ -248,14 +281,15 @@ FindStreamingStart(uint32 *tli)
 		/*
 		 * Check that the segment has the right size, if it's supposed to be
 		 * completed.  For non-compressed segments just check the on-disk size
-		 * and see if it matches a completed segment. For compressed segments,
-		 * look at the last 4 bytes of the compressed file, which is where the
-		 * uncompressed size is located for gz files with a size lower than
-		 * 4GB, and then compare it to the size of a completed segment. The 4
-		 * last bytes correspond to the ISIZE member according to
+		 * and see if it matches a completed segment. For zlib compressed
+		 * segments, look at the last 4 bytes of the compressed file, which is
+		 * where the uncompressed size is located for gz files with a size lower
+		 * than 4GB, and then compare it to the size of a completed segment.
+		 * The 4 last bytes correspond to the ISIZE member according to
 		 * http://www.zlib.org/rfc-gzip.html.
+		 * For lz4 compressed segments
 		 */
-		if (!ispartial && !iscompress)
+		if (!ispartial && !iszlibcompress && !islz4compress)
 		{
 			struct stat statbuf;
 			char		fullpath[MAXPGPATH * 2];
@@ -274,7 +308,7 @@ FindStreamingStart(uint32 *tli)
 				continue;
 			}
 		}
-		else if (!ispartial && iscompress)
+		else if (!ispartial && iszlibcompress)
 		{
 			int			fd;
 			char		buf[4];
@@ -320,6 +354,70 @@ FindStreamingStart(uint32 *tli)
 				continue;
 			}
 		}
+		else if (!ispartial && islz4compress)
+		{
+#ifdef HAVE_LIBLZ4
+			int			fd;
+			int			r;
+			size_t		consumed_len = LZ4F_HEADER_SIZE_MAX;
+			char	    buf[LZ4F_HEADER_SIZE_MAX];
+			char		fullpath[MAXPGPATH * 2];
+			LZ4F_frameInfo_t frame_info = { 0 };
+			LZ4F_decompressionContext_t ctx = NULL;
+
+			snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+
+			fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
+			if (fd < 0)
+			{
+				pg_log_error("could not open compressed file \"%s\": %m",
+							 fullpath);
+				exit(1);
+			}
+
+			r = read(fd, buf, sizeof(buf));
+			if (r != sizeof(buf))
+			{
+				if (r < 0)
+					pg_log_error("could not read compressed file \"%s\": %m",
+								 fullpath);
+				else
+					pg_log_error("could not read compressed file \"%s\": read %d of %lu",
+								 fullpath, r, sizeof(buf));
+				exit(1);
+			}
+
+			if (LZ4F_isError(LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION)))
+			{
+				pg_log_error("lz4 internal error");
+				exit(1);
+			}
+
+			LZ4F_getFrameInfo(ctx, &frame_info, (void *)buf, &consumed_len);
+			if (consumed_len <= LZ4F_HEADER_SIZE_MIN ||
+				consumed_len >= LZ4F_HEADER_SIZE_MAX)
+			{
+				pg_log_warning("compressed segment file \"%s\" has incorrect header size %lu, skipping",
+							   dirent->d_name, consumed_len);
+				LZ4F_freeDecompressionContext(ctx);
+				continue;
+			}
+
+			if (frame_info.contentSize != WalSegSz)
+			{
+				pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %lld, skipping",
+							   dirent->d_name, frame_info.contentSize);
+				LZ4F_freeDecompressionContext(ctx);
+				continue;
+			}
+
+			LZ4F_freeDecompressionContext(ctx);
+#else
+			pg_log_error("cannot verify lz4 compressed segment file \"%s\", "
+						 "this program was not build with lz4 support");
+			exit(1);
+#endif
+		}
 
 		/* Looks like a valid segment. Remember that we saw it. */
 		if ((segno > high_segno) ||
@@ -429,7 +527,9 @@ StreamLog(void)
 	stream.synchronous = synchronous;
 	stream.do_sync = do_sync;
 	stream.mark_done = false;
-	stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
+	stream.walmethod = CreateWalDirectoryMethod(basedir,
+												compression_method,
+												compresslevel,
 												stream.do_sync);
 	stream.partial_suffix = ".partial";
 	stream.replication_slot = replication_slot;
@@ -482,6 +582,7 @@ main(int argc, char **argv)
 		{"status-interval", required_argument, NULL, 's'},
 		{"slot", required_argument, NULL, 'S'},
 		{"verbose", no_argument, NULL, 'v'},
+		{"compression-method", required_argument, NULL, 'I'},
 		{"compress", required_argument, NULL, 'Z'},
 /* action */
 		{"create-slot", no_argument, NULL, 1},
@@ -573,6 +674,21 @@ main(int argc, char **argv)
 			case 'v':
 				verbose++;
 				break;
+			case 'I':
+				if (strcmp(optarg, "gzip") == 0)
+				{
+					compression_method = COMPRESSION_ZLIB;
+				}
+				else if (strcmp(optarg, "lz4") == 0)
+				{
+					compression_method = COMPRESSION_LZ4;
+				}
+				else
+				{
+					pg_log_error("invalid compression-method \"%s\"", optarg);
+					exit(1);
+				}
+				break;
 			case 'Z':
 				compresslevel = atoi(optarg);
 				if (compresslevel < 0 || compresslevel > 9)
@@ -657,14 +773,56 @@ main(int argc, char **argv)
 		exit(1);
 	}
 
+	if (compression_method != COMPRESSION_NONE)
+	{
+#ifndef HAVE_LIBZ
+		if (compression_method == COMPRESSION_ZLIB)
+		{
+			pg_log_error("this build does not support compression via gzip");
+			exit(1);
+		}
+#endif
+#ifndef HAVE_LIBLZ4
+		if (compression_method == COMPRESSION_LZ4)
+		{
+			pg_log_error("this build does not support compression via lz4");
+			exit(1);
+		}
+#endif
+	}
+
 #ifndef HAVE_LIBZ
 	if (compresslevel != 0)
 	{
-		pg_log_error("this build does not support compression");
+		pg_log_error("this build does not support compression via gzip");
 		exit(1);
 	}
 #endif
 
+	if (compresslevel != 0)
+	{
+		if (compression_method == COMPRESSION_NONE)
+		{
+			compression_method = COMPRESSION_ZLIB;
+		}
+		if (compression_method != COMPRESSION_ZLIB)
+		{
+			pg_log_error("cannot use --compress when "
+						 "--compression-method is not gzip");
+			fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+					progname);
+			exit(1);
+		}
+	}
+	else if (compression_method == COMPRESSION_ZLIB)
+	{
+		pg_log_error("cannot use --compression-method gzip when "
+					 "--compression is 0");
+		fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+				progname);
+		exit(1);
+	}
+
 	/*
 	 * Check existence of destination folder.
 	 */
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index a547c97ef1..d688cb1899 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use TestLib;
 use PostgresNode;
-use Test::More tests => 19;
+use Test::More tests => 23;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -33,6 +33,13 @@ $primary->command_fails(
 $primary->command_fails(
 	[ 'pg_receivewal', '-D', $stream_dir, '--synchronous', '--no-sync' ],
 	'failure if --synchronous specified with --no-sync');
+$primary->command_fails(
+	[
+	  'pg_receivewal', '-D', $stream_dir, '--compression-method', 'lz4',
+	  '--compress', '1'
+	],
+	'failure if --compression-method=lz4 specified with --compress');
+
 
 # Slot creation and drop
 my $slot_name = 'test';
@@ -66,6 +73,43 @@ $primary->command_ok(
 	],
 	'streaming some WAL with --synchronous');
 
+# Check lz4 compression if available
+SKIP:
+{
+	my $lz4 = $ENV{LZ4};
+
+	skip "postgres was not build with LZ4 support", 3
+		if (!check_pg_config("#define HAVE_LIBLZ4 1"));
+
+	# Generate some WAL.
+	$primary->psql('postgres', 'SELECT pg_switch_wal();');
+	$nextlsn =
+	  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+	chomp($nextlsn);
+	$primary->psql('postgres',
+		'INSERT INTO test_table VALUES (generate_series(100,200));');
+	$primary->psql('postgres', 'SELECT pg_switch_wal();');
+
+	# Stream up to the given position
+	$primary->command_ok(
+		[
+			'pg_receivewal', '-D',     $stream_dir,     '--verbose',
+			'--endpos',      $nextlsn, '--compression-method=lz4'
+		],
+		'streaming some WAL with --compression-method=lz4');
+
+	# Verify that the stored file is compressed
+	my @lz4_wals = glob "$stream_dir/*.lz4";
+	is(scalar(@lz4_wals), 1, 'one lz4 compressed WAL was created');
+
+	# Verify that the stored file is readable if program lz4 is available
+	skip "program lz4 is not found in your system", 1
+	  if (!defined $lz4 || $lz4 eq '');
+
+	my $can_decode = system_log($lz4, '-t', $lz4_wals[0]);
+	is($can_decode, 0, "program lz4 can decode compressed WAL");
+}
+
 # Permissions on WAL files should be default
 SKIP:
 {
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index a15bbb20e7..640abc83aa 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -17,6 +17,10 @@
 #include <sys/stat.h>
 #include <time.h>
 #include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
 #ifdef HAVE_LIBZ
 #include <zlib.h>
 #endif
@@ -30,6 +34,9 @@
 /* Size of zlib buffer for .tar.gz */
 #define ZLIB_OUT_SIZE 4096
 
+/* Size of lz4 input chunk for .lz4 */
+#define LZ4_IN_SIZE  4096
+
 /*-------------------------------------------------------------------------
  * WalDirectoryMethod - write wal to a directory looking like pg_wal
  *-------------------------------------------------------------------------
@@ -40,9 +47,10 @@
  */
 typedef struct DirectoryMethodData
 {
-	char	   *basedir;
-	int			compression;
-	bool		sync;
+	char				   *basedir;
+	WalCompressionMethod	compression_method;
+	int						compression;
+	bool					sync;
 } DirectoryMethodData;
 static DirectoryMethodData *dir_data = NULL;
 
@@ -59,6 +67,11 @@ typedef struct DirectoryMethodFile
 #ifdef HAVE_LIBZ
 	gzFile		gzfp;
 #endif
+#ifdef HAVE_LIBLZ4
+	LZ4F_compressionContext_t ctx;
+	size_t		lz4bufsize;
+	void	   *lz4buf;
+#endif
 } DirectoryMethodFile;
 
 static const char *
@@ -77,10 +90,16 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 #ifdef HAVE_LIBZ
 	gzFile		gzfp = NULL;
 #endif
+#ifdef HAVE_LIBLZ4
+	LZ4F_compressionContext_t ctx = NULL;
+	size_t		lz4bufsize = 0;
+	void	   *lz4buf = NULL;
+#endif
 
 	snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
 			 dir_data->basedir, pathname,
-			 dir_data->compression > 0 ? ".gz" : "",
+			 dir_data->compression_method == COMPRESSION_ZLIB ? ".gz" :
+			 dir_data->compression_method == COMPRESSION_LZ4  ? ".lz4": "",
 			 temp_suffix ? temp_suffix : "");
 
 	/*
@@ -94,7 +113,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		return NULL;
 
 #ifdef HAVE_LIBZ
-	if (dir_data->compression > 0)
+	if (dir_data->compression_method == COMPRESSION_ZLIB)
 	{
 		gzfp = gzdopen(fd, "wb");
 		if (gzfp == NULL)
@@ -111,9 +130,61 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		}
 	}
 #endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_method == COMPRESSION_LZ4)
+	{
+		LZ4F_preferences_t lz4preferences = { 0 };
+		size_t		ctx_out;
+		size_t		header_size;
+
+		/*
+		 * Set all the preferences to default but do note contentSize. It will
+		 * be needed in FindStreamingStart.
+		 */
+		memset(&lz4preferences, 0, sizeof(LZ4F_frameInfo_t));
+		lz4preferences.frameInfo.contentSize = (unsigned long long)WalSegSz;
+		ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
+		lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, &lz4preferences);
+		if (LZ4F_isError(ctx_out))
+		{
+			close(fd);
+			return NULL;
+		}
+
+		lz4buf = pg_malloc0(lz4bufsize);
+
+		/*
+		 * XXX: this is crap... lz4preferences now does show the uncompressed
+		 * size via lz4 --list <filename> but the compression goes down the
+		 * window... also it is not very helpfull to have it at the startm, does
+		 * it?
+		 */
+		/* add the header */
+		header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, &lz4preferences);
+		if (LZ4F_isError(header_size))
+		{
+			close(fd);
+			return NULL;
+		}
+
+		errno = 0;
+		if (write(fd, lz4buf, header_size) != header_size)
+		{
+			int			save_errno = errno;
+
+			close(fd);
+
+			/*
+			 * If write didn't set errno, assume problem is no disk space.
+			 */
+			errno = save_errno ? save_errno : ENOSPC;
+			return NULL;
+		}
+	}
+#endif
 
 	/* Do pre-padding on non-compressed files */
-	if (pad_to_size && dir_data->compression == 0)
+	if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE)
 	{
 		PGAlignedXLogBlock zerobuf;
 		int			bytes;
@@ -158,7 +229,7 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 			fsync_parent_path(tmppath) != 0)
 		{
 #ifdef HAVE_LIBZ
-			if (dir_data->compression > 0)
+			if (dir_data->compression_method == COMPRESSION_ZLIB)
 				gzclose(gzfp);
 			else
 #endif
@@ -169,9 +240,18 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 
 	f = pg_malloc0(sizeof(DirectoryMethodFile));
 #ifdef HAVE_LIBZ
-	if (dir_data->compression > 0)
+	if (dir_data->compression_method == COMPRESSION_ZLIB)
 		f->gzfp = gzfp;
 #endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_method == COMPRESSION_LZ4)
+	{
+		f->ctx = ctx;
+		f->lz4buf = lz4buf;
+		f->lz4bufsize = lz4bufsize;
+	}
+#endif
+
 	f->fd = fd;
 	f->currpos = 0;
 	f->pathname = pg_strdup(pathname);
@@ -191,9 +271,46 @@ dir_write(Walfile f, const void *buf, size_t count)
 	Assert(f != NULL);
 
 #ifdef HAVE_LIBZ
-	if (dir_data->compression > 0)
+	if (dir_data->compression_method == COMPRESSION_ZLIB)
 		r = (ssize_t) gzwrite(df->gzfp, buf, count);
 	else
+#endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_method == COMPRESSION_LZ4)
+	{
+		size_t		chunk;
+		size_t		remaining;
+		const void *inbuf = buf;
+
+		remaining = count;
+		while (remaining > 0)
+		{
+			size_t compressed;
+
+			if (remaining > LZ4_IN_SIZE)
+				chunk = LZ4_IN_SIZE;
+			else
+				chunk = remaining;
+
+			remaining -= chunk;
+			compressed = LZ4F_compressUpdate(df->ctx,
+											 df->lz4buf, df->lz4bufsize,
+											 inbuf, chunk,
+											 NULL);
+
+			if (LZ4F_isError(compressed))
+				return -1;
+
+			if (write(df->fd, df->lz4buf, compressed) != compressed)
+				return -1;
+
+			inbuf = ((char *)inbuf) + chunk;
+		}
+
+		/* XXX: This is what our caller expects, but it is not nice at all */
+		r = (ssize_t)count;
+	}
+	else
 #endif
 		r = write(df->fd, buf, count);
 	if (r > 0)
@@ -221,9 +338,30 @@ dir_close(Walfile f, WalCloseMethod method)
 	Assert(f != NULL);
 
 #ifdef HAVE_LIBZ
-	if (dir_data->compression > 0)
+	if (dir_data->compression_method == COMPRESSION_ZLIB)
 		r = gzclose(df->gzfp);
 	else
+#endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_method == COMPRESSION_LZ4)
+	{
+		/* Flush any internal buffers */
+		size_t compressed = LZ4F_compressEnd(df->ctx,
+											df->lz4buf, df->lz4bufsize,
+											NULL);
+		if (LZ4F_isError(compressed))
+		{
+			return -1;
+		}
+
+		if (write(df->fd, df->lz4buf, compressed) != compressed)
+		{
+			return -1;
+		}
+
+		r = close(df->fd);
+	}
+	else
 #endif
 		r = close(df->fd);
 
@@ -238,11 +376,13 @@ dir_close(Walfile f, WalCloseMethod method)
 			 */
 			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
 					 dir_data->basedir, df->pathname,
-					 dir_data->compression > 0 ? ".gz" : "",
+					 dir_data->compression_method == COMPRESSION_ZLIB ? ".gz" :
+					 dir_data->compression_method == COMPRESSION_LZ4  ? ".lz4": "",
 					 df->temp_suffix);
 			snprintf(tmppath2, sizeof(tmppath2), "%s/%s%s",
 					 dir_data->basedir, df->pathname,
-					 dir_data->compression > 0 ? ".gz" : "");
+					 dir_data->compression_method == COMPRESSION_ZLIB ? ".gz" :
+					 dir_data->compression_method == COMPRESSION_LZ4  ? ".lz4": "");
 			r = durable_rename(tmppath, tmppath2);
 		}
 		else if (method == CLOSE_UNLINK)
@@ -250,7 +390,8 @@ dir_close(Walfile f, WalCloseMethod method)
 			/* Unlink the file once it's closed */
 			snprintf(tmppath, sizeof(tmppath), "%s/%s%s%s",
 					 dir_data->basedir, df->pathname,
-					 dir_data->compression > 0 ? ".gz" : "",
+					 dir_data->compression_method == COMPRESSION_ZLIB ? ".gz" :
+					 dir_data->compression_method == COMPRESSION_LZ4  ? ".lz4": "",
 					 df->temp_suffix ? df->temp_suffix : "");
 			r = unlink(tmppath);
 		}
@@ -270,6 +411,12 @@ dir_close(Walfile f, WalCloseMethod method)
 		}
 	}
 
+#ifdef HAVE_LIBLZ4
+	pg_free(df->lz4buf);
+	/* supports free on NULL */
+	LZ4F_freeCompressionContext(df->ctx);
+#endif
+
 	pg_free(df->pathname);
 	pg_free(df->fullpath);
 	if (df->temp_suffix)
@@ -346,7 +493,9 @@ dir_finish(void)
 
 
 WalWriteMethod *
-CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
+CreateWalDirectoryMethod(const char *basedir,
+						WalCompressionMethod compression_method,
+						int compression, bool sync)
 {
 	WalWriteMethod *method;
 
@@ -362,6 +511,7 @@ CreateWalDirectoryMethod(const char *basedir, int compression, bool sync)
 	method->getlasterror = dir_getlasterror;
 
 	dir_data = pg_malloc0(sizeof(DirectoryMethodData));
+	dir_data->compression_method = compression_method;
 	dir_data->compression = compression;
 	dir_data->basedir = pg_strdup(basedir);
 	dir_data->sync = sync;
@@ -983,8 +1133,16 @@ tar_finish(void)
 	return true;
 }
 
+/*
+ * The argument compression_method is currently ignored. It is in place for
+ * symmetry with CreateWalDirectoryMethod which uses it for distinguishing
+ * between the different compression methods. CreateWalTarMethod and its family
+ * of functions handle only zlib compression.
+ */
 WalWriteMethod *
-CreateWalTarMethod(const char *tarbase, int compression, bool sync)
+CreateWalTarMethod(const char *tarbase,
+				   WalCompressionMethod compression_method,
+				   int compression, bool sync)
 {
 	WalWriteMethod *method;
 	const char *suffix = (compression != 0) ? ".tar.gz" : ".tar";
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index fc4bb52cb7..b5998a08bc 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -19,6 +19,13 @@ typedef enum
 	CLOSE_NO_RENAME
 } WalCloseMethod;
 
+typedef enum
+{
+	COMPRESSION_LZ4,
+	COMPRESSION_ZLIB,
+	COMPRESSION_NONE
+} WalCompressionMethod;
+
 /*
  * A WalWriteMethod structure represents the different methods used
  * to write the streaming WAL as it's received.
@@ -86,8 +93,11 @@ struct WalWriteMethod
  *						   not all those required for pg_receivewal)
  */
 WalWriteMethod *CreateWalDirectoryMethod(const char *basedir,
+										 WalCompressionMethod compression_method,
 										 int compression, bool sync);
-WalWriteMethod *CreateWalTarMethod(const char *tarbase, int compression, bool sync);
+WalWriteMethod *CreateWalTarMethod(const char *tarbase,
+								  WalCompressionMethod compression_method,
+								  int compression, bool sync);
 
 /* Cleanup routines for previously-created methods */
 void		FreeWalDirectoryMethod(void);
-- 
2.25.1

