From e0ef41560d8f9a4abd2da5b0c6033836aa096ecc Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Sun, 27 Dec 2020 12:01:35 +0500
Subject: [PATCH v17 7/7] Add Lz4 compression to WAL FPIs

Introduce wal_compression_method GUC to control compression
codec used to compress WAL FPI. This GUC defaults to pglz
to avoid incompatibility between builds. We cannot use custom
compression methods in WAL because custom methods are registred
in WAL-logged catalog.
NB: maybe bump WAL file magic.
---
 src/backend/access/transam/xlog.c       |  1 +
 src/backend/access/transam/xloginsert.c | 41 ++++++++++++++++++++++---
 src/backend/access/transam/xlogreader.c | 35 +++++++++++++++++++--
 src/backend/utils/misc/guc.c            | 17 ++++++++++
 src/include/access/xlog.h               |  2 ++
 src/include/access/xlogreader.h         |  1 +
 src/include/access/xlogrecord.h         |  7 +++--
 src/include/catalog/pg_am.dat           |  4 +--
 src/include/catalog/pg_proc.dat         |  6 ++--
 9 files changed, 99 insertions(+), 15 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 48ca46a941..eb31a167d1 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -112,6 +112,7 @@ int			CommitDelay = 0;	/* precommit delay in microseconds */
 int			CommitSiblings = 5; /* # concurrent xacts needed to sleep */
 int			wal_retrieve_retry_interval = 5000;
 int			max_slot_wal_keep_size_mb = -1;
+int			wal_compression_method = PGLZ_COMPRESSION_ID;
 
 #ifdef WAL_DEBUG
 bool		XLOG_DEBUG = false;
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index 1f0e4e01e6..a7f4accb1c 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -19,6 +19,7 @@
 
 #include "postgres.h"
 
+#include "access/compressamapi.h"
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "access/xlog_internal.h"
@@ -33,6 +34,10 @@
 #include "storage/proc.h"
 #include "utils/memutils.h"
 
+#ifdef HAVE_LIBLZ4
+#include "lz4.h"
+#endif
+
 /* Buffer size required to store a compressed version of backup block image */
 #define PGLZ_MAX_BLCKSZ PGLZ_MAX_OUTPUT(BLCKSZ)
 
@@ -113,7 +118,8 @@ static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi);
 static bool XLogCompressBackupBlock(char *page, uint16 hole_offset,
-									uint16 hole_length, char *dest, uint16 *dlen);
+									uint16 hole_length, char *dest,
+									uint16 *dlen, CompressionId compression);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -630,11 +636,15 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			 */
 			if (wal_compression)
 			{
+				bimg.compression_method = PGLZ_COMPRESSION_ID;
+#ifdef HAVE_LIBLZ4
+				bimg.compression_method = wal_compression_method;
+#endif
 				is_compressed =
 					XLogCompressBackupBlock(page, bimg.hole_offset,
 											cbimg.hole_length,
 											regbuf->compressed_page,
-											&compressed_len);
+											&compressed_len, bimg.compression_method);
 			}
 
 			/*
@@ -827,7 +837,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
  */
 static bool
 XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
-						char *dest, uint16 *dlen)
+						char *dest, uint16 *dlen, CompressionId compression)
 {
 	int32		orig_len = BLCKSZ - hole_length;
 	int32		len;
@@ -853,12 +863,33 @@ XLogCompressBackupBlock(char *page, uint16 hole_offset, uint16 hole_length,
 	else
 		source = page;
 
+	if (compression == LZ4_COMPRESSION_ID)
+	{
+#ifndef HAVE_LIBLZ4
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				errmsg("not built with lz4 support")));
+#else
+		len = LZ4_compress_fast(source, dest, orig_len, PGLZ_MAX_BLCKSZ, 1);
+#endif
+	}
+	else if (compression == PGLZ_COMPRESSION_ID)
+	{
+		len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
+	}
+	else
+	{
+		ereport(ERROR,
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+			 errmsg("unknown compression method requested")));
+	}
+	
+	
 	/*
-	 * We recheck the actual size even if pglz_compress() reports success and
+	 * We recheck the actual size even if compression reports success and
 	 * see if the number of bytes saved by compression is larger than the
 	 * length of extra data needed for the compressed version of block image.
 	 */
-	len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
 	if (len >= 0 &&
 		len + extra_bytes < orig_len)
 	{
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index a63ad8cfd0..392ffb0343 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -33,6 +33,10 @@
 #include "utils/memutils.h"
 #endif
 
+#ifdef HAVE_LIBLZ4
+#include "lz4.h"
+#endif
+
 static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
 			pg_attribute_printf(2, 3);
 static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
@@ -1291,6 +1295,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
 			{
 				COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16));
 				COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
+				COPY_HEADER_FIELD(&blk->compression_method, sizeof(uint8));
 				COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8));
 
 				blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0);
@@ -1565,8 +1570,34 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 	if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
 	{
 		/* If a backup block image is compressed, decompress it */
-		if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-							BLCKSZ - bkpb->hole_length, true) < 0)
+		int32 decomp_result = -1;
+		if (bkpb->compression_method == PGLZ_COMPRESSION_ID)
+		{
+			decomp_result = pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
+							BLCKSZ - bkpb->hole_length, true);
+		}
+		else if (bkpb->compression_method == LZ4_COMPRESSION_ID)
+		{
+#ifndef HAVE_LIBLZ4
+			report_invalid_record(record, "image at %X/%X is compressed with "
+									"lz4 not compiled into this build, block %d",
+								  (uint32) (record->ReadRecPtr >> 32),
+								  (uint32) record->ReadRecPtr,
+								  block_id);
+			return false;
+#else
+			decomp_result = LZ4_decompress_safe(ptr, tmp.data, bkpb->bimg_len, BLCKSZ);
+#endif
+		}
+		else
+		{
+			report_invalid_record(record, "image at %X/%X is compressed with unknown codec, block %d",
+								  (uint32) (record->ReadRecPtr >> 32),
+								  (uint32) record->ReadRecPtr,
+								  block_id);
+			return false;
+		}
+		if ( decomp_result < 0)
 		{
 			report_invalid_record(record, "invalid compressed image at %X/%X, block %d",
 								  (uint32) (record->ReadRecPtr >> 32),
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index bbaf037bc6..8ad0267f30 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -30,6 +30,7 @@
 #include <unistd.h>
 
 #include "access/commit_ts.h"
+#include "access/compressamapi.h"
 #include "access/gin.h"
 #include "access/rmgr.h"
 #include "access/tableam.h"
@@ -483,6 +484,12 @@ const struct config_enum_entry ssl_protocol_versions_info[] = {
 	{NULL, 0, false}
 };
 
+const struct config_enum_entry wal_compression_options[] = {
+	{"pglz", PGLZ_COMPRESSION_ID, false},
+	{"lz4", LZ4_COMPRESSION_ID, false},
+	{NULL, 0, false}
+};
+
 StaticAssertDecl(lengthof(ssl_protocol_versions_info) == (PG_TLS1_3_VERSION + 2),
 				 "array length mismatch");
 
@@ -4679,6 +4686,16 @@ static struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"wal_compression_method", PGC_SUSET, WAL_SETTINGS,
+			gettext_noop("Set the method used to compress full page images in the WAL."),
+			NULL
+		},
+		&wal_compression_method,
+		PGLZ_COMPRESSION_ID, wal_compression_options,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"dynamic_shared_memory_type", PGC_POSTMASTER, RESOURCES_MEM,
 			gettext_noop("Selects the dynamic shared memory implementation used."),
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 221af87e71..5099c07b63 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -11,6 +11,7 @@
 #ifndef XLOG_H
 #define XLOG_H
 
+#include "access/compressamapi.h"
 #include "access/rmgr.h"
 #include "access/xlogdefs.h"
 #include "access/xloginsert.h"
@@ -175,6 +176,7 @@ typedef enum RecoveryState
 } RecoveryState;
 
 extern PGDLLIMPORT int wal_level;
+extern PGDLLIMPORT int wal_compression_method;
 
 /* Is WAL archiving enabled (always or only while server is running normally)? */
 #define XLogArchivingActive() \
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 0b6d00dd7d..791b6f435c 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -133,6 +133,7 @@ typedef struct
 	bool		apply_image;	/* has image that should be restored */
 	char	   *bkp_image;
 	uint16		hole_offset;
+	uint8		compression_method;
 	uint16		hole_length;
 	uint16		bimg_len;
 	uint8		bimg_info;
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index 2f0c8bf589..41fe34c17c 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -129,9 +129,10 @@ typedef struct XLogRecordBlockHeader
  */
 typedef struct XLogRecordBlockImageHeader
 {
-	uint16		length;			/* number of page image bytes */
-	uint16		hole_offset;	/* number of bytes before "hole" */
-	uint8		bimg_info;		/* flag bits, see below */
+	uint16		length;				/* number of page image bytes */
+	uint16		hole_offset;		/* number of bytes before "hole" */
+	uint8		compression_method; /* compression method used for image */
+	uint8		bimg_info;			/* flag bits, see below */
 
 	/*
 	 * If BKPIMAGE_HAS_HOLE and BKPIMAGE_IS_COMPRESSED, an
diff --git a/src/include/catalog/pg_am.dat b/src/include/catalog/pg_am.dat
index 3a8d0ac09c..a984eeeadf 100644
--- a/src/include/catalog/pg_am.dat
+++ b/src/include/catalog/pg_am.dat
@@ -33,10 +33,10 @@
 { oid => '3580', oid_symbol => 'BRIN_AM_OID',
   descr => 'block range index (BRIN) access method',
   amname => 'brin', amhandler => 'brinhandler', amtype => 'i' },
-{ oid => '4225', oid_symbol => 'PGLZ_COMPRESSION_AM_OID',
+{ oid => '8022', oid_symbol => 'PGLZ_COMPRESSION_AM_OID',
   descr => 'pglz compression access method',
   amname => 'pglz', amhandler => 'pglzhandler', amtype => 'c' },
-{ oid => '4226', oid_symbol => 'LZ4_COMPRESSION_AM_OID',
+{ oid => '8023', oid_symbol => 'LZ4_COMPRESSION_AM_OID',
   descr => 'lz4 compression access method',
   amname => 'lz4', amhandler => 'lz4handler', amtype => 'c' },  
 ]
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index d851619543..3148285f45 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -941,11 +941,11 @@
   prorettype => 'void', proargtypes => 'regclass int8',
   prosrc => 'brin_desummarize_range' },
 
-{ oid => '4388', descr => 'pglz compression access method handler',
+{ oid => '8024', descr => 'pglz compression access method handler',
   proname => 'pglzhandler', provolatile => 'v',
   prorettype => 'compression_am_handler', proargtypes => 'internal',
   prosrc => 'pglzhandler' },
-{ oid => '4389', descr => 'lz4 compression access method handler',
+{ oid => '8021', descr => 'lz4 compression access method handler',
   proname => 'lz4handler', provolatile => 'v',
   prorettype => 'compression_am_handler', proargtypes => 'internal',
   prosrc => 'lz4handler' },
@@ -7056,7 +7056,7 @@
   descr => 'bytes required to store the value, perhaps with compression',
   proname => 'pg_column_size', provolatile => 's', prorettype => 'int4',
   proargtypes => 'any', prosrc => 'pg_column_size' },
-{ oid => '2228',
+{ oid => '8025',
   descr => 'compression method for the compressed datum',
   proname => 'pg_column_compression', provolatile => 's', prorettype => 'text',
   proargtypes => 'any', prosrc => 'pg_column_compression' },
-- 
2.24.3 (Apple Git-128)

