‐‐‐‐‐‐‐ Original Message ‐‐‐‐‐‐‐

On Thursday, November 4th, 2021 at 9:21 AM, Michael Paquier 
<mich...@paquier.xyz> wrote:
> > +#ifdef HAVE_LIBLZ4
> > +    while (readp < readend)
> > +    {
> > +        size_t        read_size = 1;
> > +        size_t        out_size = 1;
> > +
> > +        status = LZ4F_decompress(ctx, outbuf, &out_size,
> > +                                 readbuf, &read_size, NULL);
>
> And...  It happens that the error from v9 is here, where we need to
> read the amount of remaining data from "readp", and not "readbuf" :)
>
> It is already late here, I'll continue on this stuff tomorrow, but
> this looks rather committable overall.

Thank you for v11 of the patch. Please find attached v12 which addresses a few
minor points.

Added an Oxford comma since the list now contains three or more terms:
-        <option>--with-lz4</option>) and <literal>none</literal>.
+        <option>--with-lz4</option>), and <literal>none</literal>.

Removed an extra condinional check while switching over compression_method.
Instead of:
        +       case COMPRESSION_LZ4:
        +#ifdef HAVE_LIBLZ4
        +           if (compresslevel != 0)
        +           {
        +               pg_log_error("cannot use --compress with
        --compression-method=%s",
        +                            "lz4");
        +               fprintf(stderr, _("Try \"%s --help\" for more 
information.\n"),
        +                       progname);
        +               exit(1);
        +           }
        +#else
        +           if (compression_method == COMPRESSION_LZ4)
        +           {
        +               pg_log_error("this build does not support compression 
with %s",
        +                            "LZ4");
        +               exit(1);
        +           }
        +           break;
        +#endif

I opted for:
        +       case COMPRESSION_LZ4:
        +#ifdef HAVE_LIBLZ4
        +           if (compresslevel != 0)
        +           {
        +               pg_log_error("cannot use --compress with
        --compression-method=%s",
        +                            "lz4");
        +               fprintf(stderr, _("Try \"%s --help\" for more 
information.\n"),
        +                       progname);
        +               exit(1);
        +           }
        +#else
        +           pg_log_error("this build does not support compression with 
%s",
        +                        "LZ4");
        +           exit(1);
        + #endif

There was an error while trying to find the streaming start. The code read:
+ else if (!ispartial && compression_method == COMPRESSION_LZ4)

where it should be instead:
+ else if (!ispartial && wal_compression_method == COMPRESSION_LZ4)

because compression_method is the global option exposed to the whereas
wal_compression_method is the local variable used to figure out what kind of
file the function is currently working with. This error was existing at least in
v9-0002 of $subject.

The variables readbuf and outbuf, used in the decompression of LZ4 files, are
now heap allocated.

Last, while the following is correct:
+           /*
+            * Once we have read enough data to cover one segment, we are
+            * done, there is no need to do more.
+            */
+           while (uncompressed_size <= WalSegSz)

I felt that converting it a do {} while () loop instead, will help with
readability:
+           do
+           {
<snip>
+           /*
+            * No need to continue reading the file when the uncompressed_size
+            * exceeds WalSegSz, even if there are still data left to read.
+            * However, if uncompressed_size is equal to WalSegSz, it should
+            * verify that there is no more data to read.
+            */
+           } while (r > 0 && uncompressed_size <= WalSegSz);

of course the check:
+               /* Done reading the file */
+               if (r == 0)
+                   break;
midway the loop is no longer needed and thus removed.

I would like to have a bit more test coverage in the case for 
FindStreamingStart().
Specifically for the case that a lz4-compressed segment larger than WalSegSz 
exists.
The attached patch does not contain such test case. I am not very certain on 
how to
create such a test case reliably as it would be mostly based on a warning 
emitted
during the parsing of such a file.

Cheers,
//Georgios

> --
> Michael
From 48720e7c6ba771c45d43dc9f5e6833f8bb6715e6 Mon Sep 17 00:00:00 2001
From: Georgios Kokolatos <gkokola...@pm.me>
Date: Thu, 4 Nov 2021 16:05:21 +0000
Subject: [PATCH v12] Teach pg_receivewal to use LZ4 compression

The program pg_receivewal can use gzip compression to store the received
WAL.  This commit teaches it to also be able to use LZ4 compression. It
is required that the binary is build using the -llz4 flag. It is enabled
via the --with-lz4 flag on configuration time.

The option `--compression-method` has been expanded to inlude the value
[LZ4].  The option `--compress` can not be used with LZ4 compression.

Under the hood there is nothing exceptional to be noted. Tar based
archives have not yet been taught to use LZ4 compression. If that is
felt useful, then it is easy to be added in the future.

Tests have been added to verify the creation and correctness of the
generated LZ4 files. The later is achieved by the use of LZ4 program, if
present in the installation.
---
 doc/src/sgml/ref/pg_receivewal.sgml          |   8 +-
 src/Makefile.global.in                       |   1 +
 src/bin/pg_basebackup/Makefile               |   1 +
 src/bin/pg_basebackup/pg_receivewal.c        | 156 ++++++++++++++++++
 src/bin/pg_basebackup/t/020_pg_receivewal.pl |  72 ++++++++-
 src/bin/pg_basebackup/walmethods.c           | 160 ++++++++++++++++++-
 src/bin/pg_basebackup/walmethods.h           |   1 +
 7 files changed, 387 insertions(+), 12 deletions(-)

diff --git a/doc/src/sgml/ref/pg_receivewal.sgml b/doc/src/sgml/ref/pg_receivewal.sgml
index 79a4436ab9..5de80f8c64 100644
--- a/doc/src/sgml/ref/pg_receivewal.sgml
+++ b/doc/src/sgml/ref/pg_receivewal.sgml
@@ -268,13 +268,15 @@ PostgreSQL documentation
       <listitem>
        <para>
         Enables compression of write-ahead logs using the specified method.
-        Supported values <literal>gzip</literal>, and
-        <literal>none</literal>.
+        Supported values <literal>gzip</literal>, <literal>lz4</literal>
+        (if <productname>PostgreSQL</productname> was compiled with
+        <option>--with-lz4</option>), and <literal>none</literal>.
        </para>
 
        <para>
         The suffix <filename>.gz</filename> will automatically be added to
-        all filenames when using <literal>gzip</literal>
+        all filenames when using <literal>gzip</literal>, and the suffix
+        <filename>.lz4</filename> is added when using <literal>lz4</literal>.
        </para>
       </listitem>
      </varlistentry>
diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 533c12fef9..05c54b27de 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -350,6 +350,7 @@ XGETTEXT = @XGETTEXT@
 
 GZIP	= gzip
 BZIP2	= bzip2
+LZ4	= lz4
 
 DOWNLOAD = wget -O $@ --no-use-server-timestamps
 #DOWNLOAD = curl -o $@
diff --git a/src/bin/pg_basebackup/Makefile b/src/bin/pg_basebackup/Makefile
index 459d514183..fd920fc197 100644
--- a/src/bin/pg_basebackup/Makefile
+++ b/src/bin/pg_basebackup/Makefile
@@ -19,6 +19,7 @@ top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
 # make these available to TAP test scripts
+export LZ4
 export TAR
 # Note that GZIP cannot be used directly as this environment variable is
 # used by the command "gzip" to pass down options, so stick with a different
diff --git a/src/bin/pg_basebackup/pg_receivewal.c b/src/bin/pg_basebackup/pg_receivewal.c
index 8acc0fc009..1a943231ae 100644
--- a/src/bin/pg_basebackup/pg_receivewal.c
+++ b/src/bin/pg_basebackup/pg_receivewal.c
@@ -32,6 +32,10 @@
 #include "receivelog.h"
 #include "streamutil.h"
 
+#ifdef HAVE_LIBLZ4
+#include "lz4frame.h"
+#endif
+
 /* Time to sleep between reconnection attempts */
 #define RECONNECT_SLEEP_TIME 5
 
@@ -136,6 +140,15 @@ is_xlogfilename(const char *filename, bool *ispartial,
 		return true;
 	}
 
+	/* File looks like a completed LZ4-compressed WAL file */
+	if (fname_len == XLOG_FNAME_LEN + strlen(".lz4") &&
+		strcmp(filename + XLOG_FNAME_LEN, ".lz4") == 0)
+	{
+		*ispartial = false;
+		*wal_compression_method = COMPRESSION_LZ4;
+		return true;
+	}
+
 	/* File looks like a partial uncompressed WAL file */
 	if (fname_len == XLOG_FNAME_LEN + strlen(".partial") &&
 		strcmp(filename + XLOG_FNAME_LEN, ".partial") == 0)
@@ -154,6 +167,15 @@ is_xlogfilename(const char *filename, bool *ispartial,
 		return true;
 	}
 
+	/* File looks like a partial LZ4-compressed WAL file */
+	if (fname_len == XLOG_FNAME_LEN + strlen(".lz4.partial") &&
+		strcmp(filename + XLOG_FNAME_LEN, ".lz4.partial") == 0)
+	{
+		*ispartial = true;
+		*wal_compression_method = COMPRESSION_LZ4;
+		return true;
+	}
+
 	/* File does not look like something we know */
 	return false;
 }
@@ -284,6 +306,14 @@ FindStreamingStart(uint32 *tli)
 		 * than 4GB, and then compare it to the size of a completed segment.
 		 * The 4 last bytes correspond to the ISIZE member according to
 		 * http://www.zlib.org/rfc-gzip.html.
+		 *
+		 * For LZ4 compressed segments, uncompress the file in a throw-away
+		 * buffer keeping track of the uncompressed size, then compare it to
+		 * the size of a completed segment.  Per its protocol, LZ4 does not
+		 * store the uncompressed size of an object by default.  contentSize
+		 * is one possible way to do that, but we need to rely on a method
+		 * where WAL segments could have been compressed by a different
+		 * source than pg_receivewal, like an archive_command.
 		 */
 		if (!ispartial && wal_compression_method == COMPRESSION_NONE)
 		{
@@ -315,6 +345,7 @@ FindStreamingStart(uint32 *tli)
 			snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
 
 			fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
+
 			if (fd < 0)
 			{
 				pg_log_error("could not open compressed file \"%s\": %m",
@@ -350,6 +381,113 @@ FindStreamingStart(uint32 *tli)
 				continue;
 			}
 		}
+		else if (!ispartial && wal_compression_method == COMPRESSION_LZ4)
+		{
+#ifdef HAVE_LIBLZ4
+#define LZ4_CHUNK_SZ	64 * 1024	/* 64kB as maximum chunk size read */
+			int			fd;
+			ssize_t		r;
+			size_t		uncompressed_size = 0;
+			char		fullpath[MAXPGPATH * 2];
+			char	   *outbuf;
+			char	   *readbuf;
+			LZ4F_decompressionContext_t ctx = NULL;
+			LZ4F_decompressOptions_t dec_opt;
+			LZ4F_errorCode_t status;
+
+			memset(&dec_opt, 0, sizeof(dec_opt));
+			snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
+
+			fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
+			if (fd < 0)
+			{
+				pg_log_error("could not open file \"%s\": %m", fullpath);
+				exit(1);
+			}
+
+			status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
+			if (LZ4F_isError(status))
+			{
+				pg_log_error("could not create LZ4 decompression context: %s",
+							 LZ4F_getErrorName(status));
+				exit(1);
+			}
+
+			outbuf = pg_malloc0(LZ4_CHUNK_SZ);
+			readbuf = pg_malloc0(LZ4_CHUNK_SZ);
+			do
+			{
+				char	   *readp;
+				char	   *readend;
+
+				r = read(fd, readbuf, LZ4_CHUNK_SZ);
+				if (r < 0)
+				{
+					pg_log_error("could not read file \"%s\": %m", fullpath);
+					exit(1);
+				}
+
+				/* Done reading the file */
+				if (r == 0)
+					break;
+
+				/* Process one chunk */
+				readp = readbuf;
+				readend = readbuf + r;
+				while (readp < readend)
+				{
+					size_t		out_size = LZ4_CHUNK_SZ;
+					size_t		read_size = readend - readp;
+
+					memset(outbuf, 0, LZ4_CHUNK_SZ);
+					status = LZ4F_decompress(ctx, outbuf, &out_size,
+											 readp, &read_size, &dec_opt);
+					if (LZ4F_isError(status))
+					{
+						pg_log_error("could not decompress file \"%s\": %s",
+									 fullpath,
+									 LZ4F_getErrorName(status));
+						exit(1);
+					}
+
+					readp += read_size;
+					uncompressed_size += out_size;
+				}
+
+			/*
+			 * No need to continue reading the file when the uncompressed_size
+			 * exceeds WalSegSz, even if there are still data left to read.
+			 * However, if uncompressed_size is equal to WalSegSz, it should
+			 * verify that there is no more data to read.
+			 */
+			} while (r > 0 && uncompressed_size <= WalSegSz);
+
+			close(fd);
+			pg_free(outbuf);
+			pg_free(readbuf);
+
+			status = LZ4F_freeDecompressionContext(ctx);
+			if (LZ4F_isError(status))
+			{
+				pg_log_error("could not free LZ4 decompression context: %s",
+							 LZ4F_getErrorName(status));
+				exit(1);
+			}
+
+			if (uncompressed_size != WalSegSz)
+			{
+				pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %ld, skipping",
+							   dirent->d_name, uncompressed_size);
+				continue;
+			}
+#else
+			pg_log_error("could not check segment file \"%s\" compressed with LZ4",
+						 dirent->d_name);
+			pg_log_error("this build does not support compression with %s",
+						 "LZ4");
+			exit(1);
+#endif
+		}
 
 		/* Looks like a valid segment. Remember that we saw it. */
 		if ((segno > high_segno) ||
@@ -650,6 +788,8 @@ main(int argc, char **argv)
 			case 6:
 				if (pg_strcasecmp(optarg, "gzip") == 0)
 					compression_method = COMPRESSION_GZIP;
+				else if (pg_strcasecmp(optarg, "lz4") == 0)
+					compression_method = COMPRESSION_LZ4;
 				else if (pg_strcasecmp(optarg, "none") == 0)
 					compression_method = COMPRESSION_NONE;
 				else
@@ -746,6 +886,22 @@ main(int argc, char **argv)
 			pg_log_error("this build does not support compression with %s",
 						 "gzip");
 			exit(1);
+#endif
+			break;
+		case COMPRESSION_LZ4:
+#ifdef HAVE_LIBLZ4
+			if (compresslevel != 0)
+			{
+				pg_log_error("cannot use --compress with --compression-method=%s",
+							 "lz4");
+				fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
+						progname);
+				exit(1);
+			}
+#else
+			pg_log_error("this build does not support compression with %s",
+						 "LZ4");
+			exit(1);
 #endif
 			break;
 	}
diff --git a/src/bin/pg_basebackup/t/020_pg_receivewal.pl b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
index 94786f0815..43599d832b 100644
--- a/src/bin/pg_basebackup/t/020_pg_receivewal.pl
+++ b/src/bin/pg_basebackup/t/020_pg_receivewal.pl
@@ -5,7 +5,7 @@ use strict;
 use warnings;
 use PostgreSQL::Test::Utils;
 use PostgreSQL::Test::Cluster;
-use Test::More tests => 37;
+use Test::More tests => 42;
 
 program_help_ok('pg_receivewal');
 program_version_ok('pg_receivewal');
@@ -138,13 +138,69 @@ SKIP:
 		"gzip verified the integrity of compressed WAL segments");
 }
 
+# Check LZ4 compression if available
+SKIP:
+{
+	skip "postgres was not built with LZ4 support", 5
+	  if (!check_pg_config("#define HAVE_LIBLZ4 1"));
+
+	# Generate more WAL including one completed, compressed segment.
+	$primary->psql('postgres', 'SELECT pg_switch_wal();');
+	$nextlsn =
+	  $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
+	chomp($nextlsn);
+	$primary->psql('postgres', 'INSERT INTO test_table VALUES (3);');
+
+	# Stream up to the given position.
+	$primary->command_ok(
+		[
+			'pg_receivewal', '-D',
+			$stream_dir,     '--verbose',
+			'--endpos',      $nextlsn,
+			'--no-loop',     '--compression-method',
+			'lz4'
+		],
+		'streaming some WAL using --compression-method=lz4');
+
+	# Verify that the stored files are generated with their expected
+	# names.
+	my @lz4_wals = glob "$stream_dir/*.lz4";
+	is(scalar(@lz4_wals), 1,
+		"one WAL segment compressed with LZ4 was created");
+	my @lz4_partial_wals = glob "$stream_dir/*.lz4.partial";
+	is(scalar(@lz4_partial_wals),
+		1, "one partial WAL segment compressed with LZ4 was created");
+
+	# Verify that the start streaming position is computed correctly by
+	# comparing it with the partial file generated previously.  The name
+	# of the previous partial, now-completed WAL segment is updated, keeping
+	# its base number.
+	$partial_wals[0] =~ s/(\.gz)?\.partial$/.lz4/;
+	is($lz4_wals[0] eq $partial_wals[0],
+		1, "one partial WAL segment is now completed");
+	# Update the list of partial wals with the current one.
+	@partial_wals = @lz4_partial_wals;
+
+	# Check the integrity of the completed segment, if LZ4 is an available
+	# command.
+	my $lz4 = $ENV{LZ4};
+	skip "program lz4 is not found in your system", 1
+	  if ( !defined $lz4
+		|| $lz4 eq ''
+		|| system_log($lz4, '--version') != 0);
+
+	my $lz4_is_valid = system_log($lz4, '-t', @lz4_wals);
+	is($lz4_is_valid, 0,
+		"lz4 verified the integrity of compressed WAL segments");
+}
+
 # Verify that the start streaming position is computed and that the value is
-# correct regardless of whether ZLIB is available.
+# correct regardless of whether any compression is available.
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 $nextlsn =
   $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
 chomp($nextlsn);
-$primary->psql('postgres', 'INSERT INTO test_table VALUES (3);');
+$primary->psql('postgres', 'INSERT INTO test_table VALUES (4);');
 $primary->command_ok(
 	[
 		'pg_receivewal', '-D',     $stream_dir, '--verbose',
@@ -152,7 +208,7 @@ $primary->command_ok(
 	],
 	"streaming some WAL");
 
-$partial_wals[0] =~ s/(\.gz)?.partial//;
+$partial_wals[0] =~ s/(\.gz|\.lz4)?.partial//;
 ok(-e $partial_wals[0], "check that previously partial WAL is now complete");
 
 # Permissions on WAL files should be default
@@ -190,7 +246,7 @@ my $walfile_streamed = $primary->safe_psql(
 
 # Switch to a new segment, to make sure that the segment retained by the
 # slot is still streamed.  This may not be necessary, but play it safe.
-$primary->psql('postgres', 'INSERT INTO test_table VALUES (4);');
+$primary->psql('postgres', 'INSERT INTO test_table VALUES (5);');
 $primary->psql('postgres', 'SELECT pg_switch_wal();');
 $nextlsn =
   $primary->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
@@ -198,7 +254,7 @@ chomp($nextlsn);
 
 # Add a bit more data to accelerate the end of the next pg_receivewal
 # commands.
-$primary->psql('postgres', 'INSERT INTO test_table VALUES (5);');
+$primary->psql('postgres', 'INSERT INTO test_table VALUES (6);');
 
 # Check case where the slot does not exist.
 $primary->command_fails_like(
@@ -253,13 +309,13 @@ $standby->promote;
 # on the new timeline.
 my $walfile_after_promotion = $standby->safe_psql('postgres',
 	"SELECT pg_walfile_name(pg_current_wal_insert_lsn());");
-$standby->psql('postgres', 'INSERT INTO test_table VALUES (6);');
+$standby->psql('postgres', 'INSERT INTO test_table VALUES (7);');
 $standby->psql('postgres', 'SELECT pg_switch_wal();');
 $nextlsn =
   $standby->safe_psql('postgres', 'SELECT pg_current_wal_insert_lsn();');
 chomp($nextlsn);
 # This speeds up the operation.
-$standby->psql('postgres', 'INSERT INTO test_table VALUES (7);');
+$standby->psql('postgres', 'INSERT INTO test_table VALUES (8);');
 
 # Now try to resume from the slot after the promotion.
 my $timeline_dir = $primary->basedir . '/timeline_wal';
diff --git a/src/bin/pg_basebackup/walmethods.c b/src/bin/pg_basebackup/walmethods.c
index 52f314af3b..f1ba2a828a 100644
--- a/src/bin/pg_basebackup/walmethods.c
+++ b/src/bin/pg_basebackup/walmethods.c
@@ -17,6 +17,10 @@
 #include <sys/stat.h>
 #include <time.h>
 #include <unistd.h>
+
+#ifdef HAVE_LIBLZ4
+#include <lz4frame.h>
+#endif
 #ifdef HAVE_LIBZ
 #include <zlib.h>
 #endif
@@ -30,6 +34,9 @@
 /* Size of zlib buffer for .tar.gz */
 #define ZLIB_OUT_SIZE 4096
 
+/* Size of LZ4 input chunk for .lz4 */
+#define LZ4_IN_SIZE  4096
+
 /*-------------------------------------------------------------------------
  * WalDirectoryMethod - write wal to a directory looking like pg_wal
  *-------------------------------------------------------------------------
@@ -60,6 +67,11 @@ typedef struct DirectoryMethodFile
 #ifdef HAVE_LIBZ
 	gzFile		gzfp;
 #endif
+#ifdef HAVE_LIBLZ4
+	LZ4F_compressionContext_t ctx;
+	size_t		lz4bufsize;
+	void	   *lz4buf;
+#endif
 } DirectoryMethodFile;
 
 static const char *
@@ -76,7 +88,8 @@ dir_get_file_name(const char *pathname, const char *temp_suffix)
 
 	snprintf(filename, MAXPGPATH, "%s%s%s",
 			 pathname,
-			 dir_data->compression_method == COMPRESSION_GZIP ? ".gz" : "",
+			 dir_data->compression_method == COMPRESSION_GZIP ? ".gz" :
+			 dir_data->compression_method == COMPRESSION_LZ4 ? ".lz4" : "",
 			 temp_suffix ? temp_suffix : "");
 
 	return filename;
@@ -92,6 +105,11 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 #ifdef HAVE_LIBZ
 	gzFile		gzfp = NULL;
 #endif
+#ifdef HAVE_LIBLZ4
+	LZ4F_compressionContext_t ctx = NULL;
+	size_t		lz4bufsize = 0;
+	void	   *lz4buf = NULL;
+#endif
 
 	filename = dir_get_file_name(pathname, temp_suffix);
 	snprintf(tmppath, sizeof(tmppath), "%s/%s",
@@ -126,6 +144,50 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 		}
 	}
 #endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_method == COMPRESSION_LZ4)
+	{
+		size_t		ctx_out;
+		size_t		header_size;
+
+		ctx_out = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION);
+		if (LZ4F_isError(ctx_out))
+		{
+			close(fd);
+			return NULL;
+		}
+
+		lz4bufsize = LZ4F_compressBound(LZ4_IN_SIZE, NULL);
+		lz4buf = pg_malloc0(lz4bufsize);
+
+		/* add the header */
+		header_size = LZ4F_compressBegin(ctx, lz4buf, lz4bufsize, NULL);
+		if (LZ4F_isError(header_size))
+		{
+			(void) LZ4F_freeCompressionContext(ctx);
+			pg_free(lz4buf);
+			close(fd);
+			return NULL;
+		}
+
+		errno = 0;
+		if (write(fd, lz4buf, header_size) != header_size)
+		{
+			int			save_errno = errno;
+
+			(void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+			(void) LZ4F_freeCompressionContext(ctx);
+			pg_free(lz4buf);
+			close(fd);
+
+			/*
+			 * If write didn't set errno, assume problem is no disk space.
+			 */
+			errno = save_errno ? save_errno : ENOSPC;
+			return NULL;
+		}
+	}
+#endif
 
 	/* Do pre-padding on non-compressed files */
 	if (pad_to_size && dir_data->compression_method == COMPRESSION_NONE)
@@ -176,6 +238,16 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 			if (dir_data->compression_method == COMPRESSION_GZIP)
 				gzclose(gzfp);
 			else
+#endif
+#ifdef HAVE_LIBLZ4
+			if (dir_data->compression_method == COMPRESSION_LZ4)
+			{
+				(void) LZ4F_compressEnd(ctx, lz4buf, lz4bufsize, NULL);
+				(void) LZ4F_freeCompressionContext(ctx);
+				pg_free(lz4buf);
+				close(fd);
+			}
+			else
 #endif
 				close(fd);
 			return NULL;
@@ -187,6 +259,15 @@ dir_open_for_write(const char *pathname, const char *temp_suffix, size_t pad_to_
 	if (dir_data->compression_method == COMPRESSION_GZIP)
 		f->gzfp = gzfp;
 #endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_method == COMPRESSION_LZ4)
+	{
+		f->ctx = ctx;
+		f->lz4buf = lz4buf;
+		f->lz4bufsize = lz4bufsize;
+	}
+#endif
+
 	f->fd = fd;
 	f->currpos = 0;
 	f->pathname = pg_strdup(pathname);
@@ -209,6 +290,43 @@ dir_write(Walfile f, const void *buf, size_t count)
 	if (dir_data->compression_method == COMPRESSION_GZIP)
 		r = (ssize_t) gzwrite(df->gzfp, buf, count);
 	else
+#endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_method == COMPRESSION_LZ4)
+	{
+		size_t		chunk;
+		size_t		remaining;
+		const void *inbuf = buf;
+
+		remaining = count;
+		while (remaining > 0)
+		{
+			size_t		compressed;
+
+			if (remaining > LZ4_IN_SIZE)
+				chunk = LZ4_IN_SIZE;
+			else
+				chunk = remaining;
+
+			remaining -= chunk;
+			compressed = LZ4F_compressUpdate(df->ctx,
+											 df->lz4buf, df->lz4bufsize,
+											 inbuf, chunk,
+											 NULL);
+
+			if (LZ4F_isError(compressed))
+				return -1;
+
+			if (write(df->fd, df->lz4buf, compressed) != compressed)
+				return -1;
+
+			inbuf = ((char *) inbuf) + chunk;
+		}
+
+		/* Our caller keeps track of the uncompressed size. */
+		r = (ssize_t) count;
+	}
+	else
 #endif
 		r = write(df->fd, buf, count);
 	if (r > 0)
@@ -239,6 +357,25 @@ dir_close(Walfile f, WalCloseMethod method)
 	if (dir_data->compression_method == COMPRESSION_GZIP)
 		r = gzclose(df->gzfp);
 	else
+#endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_method == COMPRESSION_LZ4)
+	{
+		size_t		compressed;
+
+		compressed = LZ4F_compressEnd(df->ctx,
+									  df->lz4buf, df->lz4bufsize,
+									  NULL);
+
+		if (LZ4F_isError(compressed))
+			return -1;
+
+		if (write(df->fd, df->lz4buf, compressed) != compressed)
+			return -1;
+
+		r = close(df->fd);
+	}
+	else
 #endif
 		r = close(df->fd);
 
@@ -293,6 +430,12 @@ dir_close(Walfile f, WalCloseMethod method)
 		}
 	}
 
+#ifdef HAVE_LIBLZ4
+	pg_free(df->lz4buf);
+	/* supports free on NULL */
+	LZ4F_freeCompressionContext(df->ctx);
+#endif
+
 	pg_free(df->pathname);
 	pg_free(df->fullpath);
 	if (df->temp_suffix)
@@ -317,6 +460,21 @@ dir_sync(Walfile f)
 			return -1;
 	}
 #endif
+#ifdef HAVE_LIBLZ4
+	if (dir_data->compression_method == COMPRESSION_LZ4)
+	{
+		DirectoryMethodFile *df = (DirectoryMethodFile *) f;
+		size_t		compressed;
+
+		/* Flush any internal buffers */
+		compressed = LZ4F_flush(df->ctx, df->lz4buf, df->lz4bufsize, NULL);
+		if (LZ4F_isError(compressed))
+			return -1;
+
+		if (write(df->fd, df->lz4buf, compressed) != compressed)
+			return -1;
+	}
+#endif
 
 	return fsync(((DirectoryMethodFile *) f)->fd);
 }
diff --git a/src/bin/pg_basebackup/walmethods.h b/src/bin/pg_basebackup/walmethods.h
index 5dfe330ea5..f9b6a1646a 100644
--- a/src/bin/pg_basebackup/walmethods.h
+++ b/src/bin/pg_basebackup/walmethods.h
@@ -22,6 +22,7 @@ typedef enum
 /* Types of compression supported */
 typedef enum
 {
+	COMPRESSION_LZ4,
 	COMPRESSION_GZIP,
 	COMPRESSION_NONE
 } WalCompressionMethod;
-- 
2.25.1

Reply via email to