From 7d4099a7f46a516e75816a7006827ab655fcfbf1 Mon Sep 17 00:00:00 2001
From: Andrey Borodin <amborodin@acm.org>
Date: Wed, 8 Jan 2025 16:39:20 +0500
Subject: [PATCH v0] Compress big WAL records

This approach replaces FPI compression
---
 src/backend/access/transam/xlog.c             |   4 +-
 src/backend/access/transam/xloginsert.c       | 387 +++++++++---------
 src/backend/access/transam/xlogreader.c       | 207 ++++++----
 src/backend/utils/misc/guc_tables.c           |   2 +-
 src/include/access/xloginsert.h               |   1 +
 src/include/access/xlogreader.h               |   2 +
 src/include/access/xlogrecord.h               |  20 +-
 src/include/pg_config_manual.h                |   2 +-
 .../recovery/t/026_overwrite_contrecord.pl    |   2 +-
 9 files changed, 326 insertions(+), 301 deletions(-)

diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index bf3dbda901..5674341368 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -121,7 +121,7 @@ char	   *XLogArchiveCommand = NULL;
 bool		EnableHotStandby = false;
 bool		fullPageWrites = true;
 bool		wal_log_hints = false;
-int			wal_compression = WAL_COMPRESSION_NONE;
+int			wal_compression = WAL_COMPRESSION_ZSTD;
 char	   *wal_consistency_checking_string = NULL;
 bool	   *wal_consistency_checking = NULL;
 bool		wal_init_zero = true;
@@ -1031,7 +1031,7 @@ XLogInsertRecord(XLogRecData *rdata,
 		/* We also need temporary space to decode the record. */
 		record = (XLogRecord *) recordBuf.data;
 		decoded = (DecodedXLogRecord *)
-			palloc(DecodeXLogRecordRequiredSpace(record->xl_tot_len));
+			palloc(DecodeXLogRecordRequiredSpace(XLogGetRecordTotalLen(record)));
 
 		if (!debug_reader)
 			debug_reader = XLogReaderAllocate(wal_segment_size, NULL,
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index efed097092..ac57008b71 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -40,27 +40,6 @@
 #include "storage/proc.h"
 #include "utils/memutils.h"
 
-/*
- * Guess the maximum buffer size required to store a compressed version of
- * backup block image.
- */
-#ifdef USE_LZ4
-#define	LZ4_MAX_BLCKSZ		LZ4_COMPRESSBOUND(BLCKSZ)
-#else
-#define LZ4_MAX_BLCKSZ		0
-#endif
-
-#ifdef USE_ZSTD
-#define ZSTD_MAX_BLCKSZ		ZSTD_COMPRESSBOUND(BLCKSZ)
-#else
-#define ZSTD_MAX_BLCKSZ		0
-#endif
-
-#define PGLZ_MAX_BLCKSZ		PGLZ_MAX_OUTPUT(BLCKSZ)
-
-/* Buffer size required to store a compressed version of backup block image */
-#define COMPRESS_BUFSIZE	Max(Max(PGLZ_MAX_BLCKSZ, LZ4_MAX_BLCKSZ), ZSTD_MAX_BLCKSZ)
-
 /*
  * For each block reference registered with XLogRegisterBuffer, we fill in
  * a registered_buffer struct.
@@ -81,9 +60,6 @@ typedef struct
 
 	XLogRecData bkp_rdatas[2];	/* temporary rdatas used to hold references to
 								 * backup block data in XLogRecordAssemble() */
-
-	/* buffer to store a compressed version of backup block image */
-	char		compressed_page[COMPRESS_BUFSIZE];
 } registered_buffer;
 
 static registered_buffer *registered_buffers;
@@ -113,6 +89,16 @@ static uint8 curinsert_flags = 0;
 static XLogRecData hdr_rdt;
 static char *hdr_scratch = NULL;
 
+/*
+ * Compression buffers are kep in StringInfor for a prototype
+ * compression_buffer_current_size == -1 means we entered critical section
+ * before could prepare compression buffers.
+ */
+static int32			compression_buffer_current_size;
+static XLogRecData		compressed_rdt_hdr;
+static StringInfo		data_before_compression = NULL;
+static StringInfo		compressed_data = NULL;
+
 #define SizeOfXlogOrigin	(sizeof(RepOriginId) + sizeof(char))
 #define SizeOfXLogTransactionId	(sizeof(TransactionId) + sizeof(char))
 
@@ -137,9 +123,7 @@ static MemoryContext xloginsert_cxt;
 static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
 									   XLogRecPtr RedoRecPtr, bool doPageWrites,
 									   XLogRecPtr *fpw_lsn, int *num_fpi,
-									   bool *topxid_included);
-static bool XLogCompressBackupBlock(const char *page, uint16 hole_offset,
-									uint16 hole_length, char *dest, uint16 *dlen);
+									   bool *topxid_included, uint64 *rec_size);
 
 /*
  * Begin constructing a WAL record. This must be called before the
@@ -160,6 +144,11 @@ XLogBeginInsert(void)
 		elog(ERROR, "XLogBeginInsert was already called");
 
 	begininsert_called = true;
+
+	if (data_before_compression)
+		resetStringInfo(data_before_compression);
+	if (compressed_data)
+		resetStringInfo(compressed_data);
 }
 
 /*
@@ -231,6 +220,7 @@ XLogResetInsertion(void)
 	mainrdata_len = 0;
 	mainrdata_last = (XLogRecData *) &mainrdata_head;
 	curinsert_flags = 0;
+	compression_buffer_current_size = 0;
 	begininsert_called = false;
 }
 
@@ -299,6 +289,8 @@ XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
 #endif
 
 	regbuf->in_use = true;
+
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -352,6 +344,7 @@ XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator, ForkNumber forknum,
 #endif
 
 	regbuf->in_use = true;
+	XLogEnsureCompressionBuffer(MaxSizeOfXLogRecordBlockHeader + BLCKSZ);
 }
 
 /*
@@ -386,6 +379,7 @@ XLogRegisterData(const char *data, uint32 len)
 	mainrdata_last = rdata;
 
 	mainrdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -440,6 +434,7 @@ XLogRegisterBufData(uint8 block_id, const char *data, uint32 len)
 	regbuf->rdata_tail->next = rdata;
 	regbuf->rdata_tail = rdata;
 	regbuf->rdata_len += len;
+	XLogEnsureCompressionBuffer(len);
 }
 
 /*
@@ -459,6 +454,142 @@ XLogSetRecordFlags(uint8 flags)
 	curinsert_flags |= flags;
 }
 
+/*
+ * Make sure we have buffers needed for compression.
+ * We cannot do it during WALInsert(), because we will be in a critial section.
+ */
+void XLogEnsureCompressionBuffer(uint32 extraLen)
+{
+	uint64 compressed_buffer_size;
+	uint64 desired_buffer_size;
+
+	if (wal_compression == WAL_COMPRESSION_NONE)
+		return;
+
+	if (CritSectionCount > 0 || compression_buffer_current_size == -1)
+	{
+		/* We cannot prepare buffer during critical section, so bail out early */
+		compression_buffer_current_size = -1;
+		return;
+	}
+
+	compression_buffer_current_size += extraLen;
+	desired_buffer_size = compression_buffer_current_size + SizeOfXLogRecord;
+	Assert(data_before_compression->len == 0);
+	enlargeStringInfo(data_before_compression, desired_buffer_size);
+
+	compressed_buffer_size = PGLZ_MAX_OUTPUT(desired_buffer_size);
+
+#ifdef USE_LZ4
+	compressed_buffer_size = Max(compressed_buffer_size, LZ4_COMPRESSBOUND(desired_buffer_size));
+#endif
+#ifdef USE_ZSTD
+	compressed_buffer_size = Max(compressed_buffer_size, ZSTD_COMPRESSBOUND(desired_buffer_size));
+#endif
+	compressed_buffer_size = compressed_buffer_size + sizeof(XLogCompressionData);
+	Assert(compressed_data->len == 0);
+	enlargeStringInfo(compressed_data, compressed_buffer_size);
+}
+
+/* Compress assembled record on top of compression buffers */
+static XLogRecData*
+XLogCompressRdt(XLogRecData *rdt)
+{
+	XLogCompressionData *compressed_header;
+	XLogRecord *src_header;
+	uint32 orig_len;
+	uint32 compr_len;
+
+	if (compression_buffer_current_size == -1)
+		return NULL;
+
+	Assert(wal_compression != WAL_COMPRESSION_NONE);
+	
+	Assert(compression_buffer_current_size <= data_before_compression->maxlen);
+
+	/* Build the whole record */
+	for (; rdt != NULL; rdt = rdt->next)
+		appendBinaryStringInfoNT(data_before_compression, rdt->data, rdt->len);
+
+	src_header = (XLogRecord*) data_before_compression->data;
+	compressed_header = (XLogCompressionData*) compressed_data->data;
+
+	compressed_header->record_header = *src_header;
+	compressed_header->decompressed_length = data_before_compression->len;
+
+	orig_len = src_header->xl_tot_len - SizeOfXLogRecord;
+
+	switch ((WalCompression) wal_compression)
+	{
+		case WAL_COMPRESSION_PGLZ:
+			compressed_header->method = BKPIMAGE_COMPRESS_PGLZ;
+			compr_len = pglz_compress((char*)&src_header[1], orig_len, (char*)&compressed_header[1], PGLZ_strategy_default);
+			if (compr_len == -1)
+				return NULL;
+			break;
+
+		case WAL_COMPRESSION_LZ4:
+#ifdef USE_LZ4
+			compressed_header->method = BKPIMAGE_COMPRESS_LZ4;
+			compr_len = LZ4_compress_default((char*)&src_header[1], (char*)&compressed_header[1], orig_len,
+									   compressed_data->maxlen);
+			if (compr_len <= 0)
+				return NULL;
+#else
+			elog(ERROR, "LZ4 is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_ZSTD:
+#ifdef USE_ZSTD
+			compressed_header->method = BKPIMAGE_COMPRESS_ZSTD;
+			compr_len = ZSTD_compress((char*)&compressed_header[1], compressed_data->maxlen, (char*)&src_header[1], orig_len,
+								ZSTD_CLEVEL_DEFAULT);
+			if (ZSTD_isError(compr_len))
+				return NULL;
+#else
+			elog(ERROR, "zstd is not supported by this build");
+#endif
+			break;
+
+		case WAL_COMPRESSION_NONE:
+			Assert(false);		/* cannot happen */
+			return NULL;
+			break;
+			/* no default case, so that compiler will warn */
+	}
+
+	compressed_header->record_header.xl_tot_len = sizeof(XLogCompressionData) + compr_len;
+
+	compressed_header->record_header.xl_info = compressed_header->record_header.xl_info | XLR_COMPRESSED;
+
+	compressed_rdt_hdr.data = compressed_data->data;
+	compressed_rdt_hdr.len = compressed_header->record_header.xl_tot_len;
+	compressed_rdt_hdr.next = NULL;
+
+	return &compressed_rdt_hdr;
+}
+
+/* Checksum assebled record, possibly compressed */
+static void XLogChecksumRecord(XLogRecData *rdt)
+{
+	pg_crc32c	rdata_crc;
+	XLogRecord *rechdr = (XLogRecord*) rdt->data;
+	/*
+	 * Calculate CRC of the data
+	 *
+	 * Note that the record header isn't added into the CRC initially since we
+	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
+	 * the whole record in the order: rdata, then backup blocks, then record
+	 * header.
+	 */
+	INIT_CRC32C(rdata_crc);
+	COMP_CRC32C(rdata_crc, rdt->data + SizeOfXLogRecord, rdt->len - SizeOfXLogRecord);
+	for (rdt = rdt->next; rdt != NULL; rdt = rdt->next)
+		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
+	rechdr->xl_crc = rdata_crc;
+}
+
 /*
  * Insert an XLOG record having the specified RMID and info bytes, with the
  * body of the record being the data and buffer references registered earlier
@@ -509,6 +640,8 @@ XLogInsert(RmgrId rmid, uint8 info)
 		XLogRecPtr	fpw_lsn;
 		XLogRecData *rdt;
 		int			num_fpi = 0;
+		uint64		rec_size;
+
 
 		/*
 		 * Get values needed to decide whether to do full-page writes. Since
@@ -518,7 +651,18 @@ XLogInsert(RmgrId rmid, uint8 info)
 		GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
 
 		rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
-								 &fpw_lsn, &num_fpi, &topxid_included);
+								 &fpw_lsn, &num_fpi, &topxid_included,
+								 &rec_size);
+
+		// TODO: Invent good name for a GUC controlling compression threshold for a record size
+		if (rec_size > 512 && wal_compression != WAL_COMPRESSION_NONE)
+		{
+			XLogRecData *rdt_compressed = XLogCompressRdt(rdt);
+			if (rdt_compressed != NULL)
+				rdt = rdt_compressed;
+		}
+
+		XLogChecksumRecord(rdt);
 
 		EndPos = XLogInsertRecord(rdt, fpw_lsn, curinsert_flags, num_fpi,
 								  topxid_included);
@@ -547,12 +691,11 @@ XLogInsert(RmgrId rmid, uint8 info)
 static XLogRecData *
 XLogRecordAssemble(RmgrId rmid, uint8 info,
 				   XLogRecPtr RedoRecPtr, bool doPageWrites,
-				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included)
+				   XLogRecPtr *fpw_lsn, int *num_fpi, bool *topxid_included,
+				   uint64 *rec_size)
 {
-	XLogRecData *rdt;
 	uint64		total_len = 0;
 	int			block_id;
-	pg_crc32c	rdata_crc;
 	registered_buffer *prev_regbuf = NULL;
 	XLogRecData *rdt_datas_last;
 	XLogRecord *rechdr;
@@ -593,9 +736,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		bool		needs_data;
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
-		XLogRecordBlockCompressHeader cbimg = {0};
+		uint32		hole_length;
 		bool		samerel;
-		bool		is_compressed = false;
 		bool		include_image;
 
 		if (!regbuf->in_use)
@@ -649,7 +791,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		if (include_image)
 		{
 			const char *page = regbuf->page;
-			uint16		compressed_len = 0;
 
 			/*
 			 * The page needs to be backed up, so calculate its hole length
@@ -666,32 +807,20 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					upper <= BLCKSZ)
 				{
 					bimg.hole_offset = lower;
-					cbimg.hole_length = upper - lower;
+					hole_length = upper - lower;
 				}
 				else
 				{
 					/* No "hole" to remove */
 					bimg.hole_offset = 0;
-					cbimg.hole_length = 0;
+					hole_length = 0;
 				}
 			}
 			else
 			{
 				/* Not a standard page header, don't try to eliminate "hole" */
 				bimg.hole_offset = 0;
-				cbimg.hole_length = 0;
-			}
-
-			/*
-			 * Try to compress a block image if wal_compression is enabled
-			 */
-			if (wal_compression != WAL_COMPRESSION_NONE)
-			{
-				is_compressed =
-					XLogCompressBackupBlock(page, bimg.hole_offset,
-											cbimg.hole_length,
-											regbuf->compressed_page,
-											&compressed_len);
+				hole_length = 0;
 			}
 
 			/*
@@ -709,7 +838,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			rdt_datas_last->next = &regbuf->bkp_rdatas[0];
 			rdt_datas_last = rdt_datas_last->next;
 
-			bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
+			bimg.bimg_info = (hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
 
 			/*
 			 * If WAL consistency checking is enabled for the resource manager
@@ -720,48 +849,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			if (needs_backup)
 				bimg.bimg_info |= BKPIMAGE_APPLY;
 
-			if (is_compressed)
-			{
-				/* The current compression is stored in the WAL record */
-				bimg.length = compressed_len;
-
-				/* Set the compression method used for this block */
-				switch ((WalCompression) wal_compression)
-				{
-					case WAL_COMPRESSION_PGLZ:
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_PGLZ;
-						break;
-
-					case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_LZ4;
-#else
-						elog(ERROR, "LZ4 is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-						bimg.bimg_info |= BKPIMAGE_COMPRESS_ZSTD;
-#else
-						elog(ERROR, "zstd is not supported by this build");
-#endif
-						break;
-
-					case WAL_COMPRESSION_NONE:
-						Assert(false);	/* cannot happen */
-						break;
-						/* no default case, so that compiler will warn */
-				}
-
-				rdt_datas_last->data = regbuf->compressed_page;
-				rdt_datas_last->len = compressed_len;
-			}
-			else
 			{
-				bimg.length = BLCKSZ - cbimg.hole_length;
+				bimg.length = BLCKSZ - hole_length;
 
-				if (cbimg.hole_length == 0)
+				if (hole_length == 0)
 				{
 					rdt_datas_last->data = page;
 					rdt_datas_last->len = BLCKSZ;
@@ -776,9 +867,9 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 					rdt_datas_last = rdt_datas_last->next;
 
 					rdt_datas_last->data =
-						page + (bimg.hole_offset + cbimg.hole_length);
+						page + (bimg.hole_offset + hole_length);
 					rdt_datas_last->len =
-						BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
+						BLCKSZ - (bimg.hole_offset + hole_length);
 				}
 			}
 
@@ -821,12 +912,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
 			scratch += SizeOfXLogRecordBlockImageHeader;
-			if (cbimg.hole_length != 0 && is_compressed)
-			{
-				memcpy(scratch, &cbimg,
-					   SizeOfXLogRecordBlockCompressHeader);
-				scratch += SizeOfXLogRecordBlockCompressHeader;
-			}
 		}
 		if (!samerel)
 		{
@@ -892,19 +977,6 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	hdr_rdt.len = (scratch - hdr_scratch);
 	total_len += hdr_rdt.len;
 
-	/*
-	 * Calculate CRC of the data
-	 *
-	 * Note that the record header isn't added into the CRC initially since we
-	 * don't know the prev-link yet.  Thus, the CRC will represent the CRC of
-	 * the whole record in the order: rdata, then backup blocks, then record
-	 * header.
-	 */
-	INIT_CRC32C(rdata_crc);
-	COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
-	for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
-		COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
-
 	/*
 	 * Ensure that the XLogRecord is not too large.
 	 *
@@ -928,92 +1000,11 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 	rechdr->xl_info = info;
 	rechdr->xl_rmid = rmid;
 	rechdr->xl_prev = InvalidXLogRecPtr;
-	rechdr->xl_crc = rdata_crc;
+	rechdr->xl_crc = 0;
 
-	return &hdr_rdt;
-}
+	*rec_size = rechdr->xl_tot_len;
 
-/*
- * Create a compressed version of a backup block image.
- *
- * Returns false if compression fails (i.e., compressed result is actually
- * bigger than original). Otherwise, returns true and sets 'dlen' to
- * the length of compressed block image.
- */
-static bool
-XLogCompressBackupBlock(const char *page, uint16 hole_offset, uint16 hole_length,
-						char *dest, uint16 *dlen)
-{
-	int32		orig_len = BLCKSZ - hole_length;
-	int32		len = -1;
-	int32		extra_bytes = 0;
-	const char *source;
-	PGAlignedBlock tmp;
-
-	if (hole_length != 0)
-	{
-		/* must skip the hole */
-		memcpy(tmp.data, page, hole_offset);
-		memcpy(tmp.data + hole_offset,
-			   page + (hole_offset + hole_length),
-			   BLCKSZ - (hole_length + hole_offset));
-		source = tmp.data;
-
-		/*
-		 * Extra data needs to be stored in WAL record for the compressed
-		 * version of block image if the hole exists.
-		 */
-		extra_bytes = SizeOfXLogRecordBlockCompressHeader;
-	}
-	else
-		source = page;
-
-	switch ((WalCompression) wal_compression)
-	{
-		case WAL_COMPRESSION_PGLZ:
-			len = pglz_compress(source, orig_len, dest, PGLZ_strategy_default);
-			break;
-
-		case WAL_COMPRESSION_LZ4:
-#ifdef USE_LZ4
-			len = LZ4_compress_default(source, dest, orig_len,
-									   COMPRESS_BUFSIZE);
-			if (len <= 0)
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "LZ4 is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_ZSTD:
-#ifdef USE_ZSTD
-			len = ZSTD_compress(dest, COMPRESS_BUFSIZE, source, orig_len,
-								ZSTD_CLEVEL_DEFAULT);
-			if (ZSTD_isError(len))
-				len = -1;		/* failure */
-#else
-			elog(ERROR, "zstd is not supported by this build");
-#endif
-			break;
-
-		case WAL_COMPRESSION_NONE:
-			Assert(false);		/* cannot happen */
-			break;
-			/* no default case, so that compiler will warn */
-	}
-
-	/*
-	 * We recheck the actual size even if compression reports success and see
-	 * if the number of bytes saved by compression is larger than the length
-	 * of extra data needed for the compressed version of block image.
-	 */
-	if (len >= 0 &&
-		len + extra_bytes < orig_len)
-	{
-		*dlen = (uint16) len;	/* successful compression */
-		return true;
-	}
-	return false;
+	return &hdr_rdt;
 }
 
 /*
@@ -1389,4 +1380,10 @@ InitXLogInsert(void)
 	if (hdr_scratch == NULL)
 		hdr_scratch = MemoryContextAllocZero(xloginsert_cxt,
 											 HEADER_SCRATCH_SIZE);
+
+	if (data_before_compression == NULL)
+		data_before_compression = makeStringInfo();
+	if (compressed_data == NULL)
+		compressed_data = makeStringInfo();
+	XLogEnsureCompressionBuffer(SizeOfXLogRecord);
 }
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 3596af0617..0f7702350a 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -53,6 +53,7 @@ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record,
 static void ResetDecoder(XLogReaderState *state);
 static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
 							   int segsize, const char *waldir);
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogRecord *record);
 
 /* size of the buffer allocated for error message. */
 #define MAX_ERRORMSG_LEN 1000
@@ -524,6 +525,16 @@ XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversi
 	return NULL;
 }
 
+uint32 XLogGetRecordTotalLen(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionData *c = (XLogCompressionData*) record;
+		return c->decompressed_length;
+	}
+	return record->xl_tot_len;
+}
+
 static XLogPageReadResult
 XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 {
@@ -532,7 +543,8 @@ XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking)
 	XLogRecPtr	targetPagePtr;
 	bool		randAccess;
 	uint32		len,
-				total_len;
+				total_len_decomp,
+				total_len_phisical;
 	uint32		targetRecOff;
 	uint32		pageHeaderSize;
 	bool		assembled;
@@ -643,7 +655,8 @@ restart:
 	 * whole header.
 	 */
 	record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ);
-	total_len = record->xl_tot_len;
+	total_len_decomp = XLogGetRecordTotalLen(record);
+	total_len_phisical = record->xl_tot_len;
 
 	/*
 	 * If the whole record header is on this page, validate it immediately.
@@ -663,12 +676,12 @@ restart:
 	else
 	{
 		/* There may be no next page if it's too small. */
-		if (total_len < SizeOfXLogRecord)
+		if (total_len_phisical < SizeOfXLogRecord)
 		{
 			report_invalid_record(state,
 								  "invalid record length at %X/%X: expected at least %u, got %u",
 								  LSN_FORMAT_ARGS(RecPtr),
-								  (uint32) SizeOfXLogRecord, total_len);
+								  (uint32) SizeOfXLogRecord, total_len_phisical);
 			goto err;
 		}
 		/* We'll validate the header once we have the next page. */
@@ -681,7 +694,7 @@ restart:
 	 * validated that total_len isn't garbage bytes from a recycled WAL page.
 	 */
 	decoded = XLogReadRecordAlloc(state,
-								  total_len,
+								  total_len_decomp,
 								  false /* allow_oversized */ );
 	if (decoded == NULL && nonblocking)
 	{
@@ -694,7 +707,7 @@ restart:
 	}
 
 	len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
-	if (total_len > len)
+	if (total_len_phisical > len)
 	{
 		/* Need to reassemble record */
 		char	   *contdata;
@@ -724,7 +737,7 @@ restart:
 
 			/* Wait for the next page to become available */
 			readOff = ReadPageInternal(state, targetPagePtr,
-									   Min(total_len - gotlen + SizeOfXLogShortPHD,
+									   Min(total_len_phisical - gotlen + SizeOfXLogShortPHD,
 										   XLOG_BLCKSZ));
 
 			if (readOff == XLREAD_WOULDBLOCK)
@@ -765,12 +778,12 @@ restart:
 			 * we expect there to be left.
 			 */
 			if (pageHeader->xlp_rem_len == 0 ||
-				total_len != (pageHeader->xlp_rem_len + gotlen))
+				total_len_phisical != (pageHeader->xlp_rem_len + gotlen))
 			{
 				report_invalid_record(state,
 									  "invalid contrecord length %u (expected %lld) at %X/%X",
 									  pageHeader->xlp_rem_len,
-									  ((long long) total_len) - gotlen,
+									  ((long long) total_len_phisical) - gotlen,
 									  LSN_FORMAT_ARGS(RecPtr));
 				goto err;
 			}
@@ -813,7 +826,7 @@ restart:
 			 * also cross-checked total_len against xlp_rem_len on the second
 			 * page, and verified xlp_pageaddr on both.
 			 */
-			if (total_len > state->readRecordBufSize)
+			if (total_len_decomp > state->readRecordBufSize)
 			{
 				char		save_copy[XLOG_BLCKSZ * 2];
 
@@ -824,11 +837,11 @@ restart:
 				Assert(gotlen <= lengthof(save_copy));
 				Assert(gotlen <= state->readRecordBufSize);
 				memcpy(save_copy, state->readRecordBuf, gotlen);
-				allocate_recordbuf(state, total_len);
+				allocate_recordbuf(state, total_len_decomp);
 				memcpy(state->readRecordBuf, save_copy, gotlen);
 				buffer = state->readRecordBuf + gotlen;
 			}
-		} while (gotlen < total_len);
+		} while (gotlen < total_len_phisical);
 		Assert(gotheader);
 
 		record = (XLogRecord *) state->readRecordBuf;
@@ -844,7 +857,7 @@ restart:
 	{
 		/* Wait for the record data to become available */
 		readOff = ReadPageInternal(state, targetPagePtr,
-								   Min(targetRecOff + total_len, XLOG_BLCKSZ));
+								   Min(targetRecOff + total_len_phisical, XLOG_BLCKSZ));
 		if (readOff == XLREAD_WOULDBLOCK)
 			return XLREAD_WOULDBLOCK;
 		else if (readOff < 0)
@@ -854,7 +867,7 @@ restart:
 		if (!ValidXLogRecord(state, record, RecPtr))
 			goto err;
 
-		state->NextRecPtr = RecPtr + MAXALIGN(total_len);
+		state->NextRecPtr = RecPtr + MAXALIGN(total_len_phisical);
 
 		state->DecodeRecPtr = RecPtr;
 	}
@@ -878,7 +891,7 @@ restart:
 	{
 		Assert(!nonblocking);
 		decoded = XLogReadRecordAlloc(state,
-									  total_len,
+									  total_len_decomp,
 									  true /* allow_oversized */ );
 		/* allocation should always happen under allow_oversized */
 		Assert(decoded != NULL);
@@ -1646,6 +1659,91 @@ DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
 	return size;
 }
 
+static char* decompression_buffer = NULL;
+static uint32 decompression_buffer_len = 0;
+
+static XLogRecord* XLogDecompressRecordIfNeeded(XLogRecord *record)
+{
+	if (record->xl_info & XLR_COMPRESSED)
+	{
+		XLogCompressionData	*src = (XLogCompressionData*) record;
+		bool				decomp_success = true;
+		uint32				srclen = src->record_header.xl_tot_len - sizeof(XLogCompressionData);
+		char				*dst;
+		XLogRecord			*dst_h;
+
+		if (decompression_buffer_len < src->decompressed_length)
+		{
+			if (decompression_buffer)
+				pfree(decompression_buffer);
+			/* Avoid small steps in growths, we compress only big records */
+			decompression_buffer_len = TYPEALIGN(BLCKSZ, src->decompressed_length);
+			decompression_buffer = palloc(decompression_buffer_len + SizeOfXLogRecord);
+		}
+		dst_h = (XLogRecord*) decompression_buffer;
+		*dst_h = src->record_header;
+		dst_h->xl_tot_len = src->decompressed_length;
+		dst = (char*) &dst_h[1];
+
+		/* If a backup block image is compressed, decompress it */
+
+		if (src->method == BKPIMAGE_COMPRESS_PGLZ)
+		{
+			if (pglz_decompress((char*) &src[1], srclen, dst,
+								decompression_buffer_len, true) < 0)
+				decomp_success = false;
+		}
+		else if (src->method == BKPIMAGE_COMPRESS_LZ4)
+		{
+#ifdef USE_LZ4
+			if (LZ4_decompress_safe((char*) &src[1], dst,
+									srclen, decompression_buffer_len) <= 0)
+				decomp_success = false;
+#else
+			// report_invalid_record(src, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			// 					  LSN_FORMAT_ARGS((XLogRecPtr)0),
+			// 					  "LZ4",
+			// 					  0);
+			return NULL;
+#endif
+		}
+		else if (src->method == BKPIMAGE_COMPRESS_ZSTD)
+		{
+#ifdef USE_ZSTD
+			size_t		decomp_result = ZSTD_decompress(dst,
+														decompression_buffer_len,
+														(char*) &src[1], srclen);
+			if (ZSTD_isError(decomp_result))
+				decomp_success = false;
+#else
+			// report_invalid_record(src, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
+			// 					  LSN_FORMAT_ARGS((XLogRecPtr)0),
+			// 					  "zstd",
+			// 					  0);
+			return NULL;
+#endif
+		}
+		else
+		{
+			// report_invalid_record(src, "could not restore image at %X/%X compressed with unknown method, block %d",
+			// 					  LSN_FORMAT_ARGS((XLogRecPtr)0),
+			// 					  0);
+			return NULL;
+		}
+
+		if (!decomp_success)
+		{
+			// report_invalid_record(src, "could not decompress image at %X/%X, block %d",
+			// 					  LSN_FORMAT_ARGS((XLogRecPtr)0),
+			// 					  0);
+			return NULL;
+		}
+
+		return (XLogRecord*) decompression_buffer;
+	}
+	return record;
+}
+
 /*
  * Decode a record.  "decoded" must point to a MAXALIGNed memory area that has
  * space for at least DecodeXLogRecordRequiredSpace(record) bytes.  On
@@ -1684,6 +1782,8 @@ DecodeXLogRecord(XLogReaderState *state,
 	RelFileLocator *rlocator = NULL;
 	uint8		block_id;
 
+	record = XLogDecompressRecordIfNeeded(record);
+
 	decoded->header = *record;
 	decoded->lsn = lsn;
 	decoded->next = NULL;
@@ -1794,10 +1894,7 @@ DecodeXLogRecord(XLogReaderState *state,
 
 				if (BKPIMAGE_COMPRESSED(blk->bimg_info))
 				{
-					if (blk->bimg_info & BKPIMAGE_HAS_HOLE)
-						COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
-					else
-						blk->hole_length = 0;
+					Assert(false);
 				}
 				else
 					blk->hole_length = BLCKSZ - blk->bimg_len;
@@ -1836,19 +1933,6 @@ DecodeXLogRecord(XLogReaderState *state,
 					goto err;
 				}
 
-				/*
-				 * Cross-check that bimg_len < BLCKSZ if it is compressed.
-				 */
-				if (BKPIMAGE_COMPRESSED(blk->bimg_info) &&
-					blk->bimg_len == BLCKSZ)
-				{
-					report_invalid_record(state,
-										  "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X",
-										  (unsigned int) blk->bimg_len,
-										  LSN_FORMAT_ARGS(state->ReadRecPtr));
-					goto err;
-				}
-
 				/*
 				 * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is
 				 * set nor COMPRESSED().
@@ -2057,7 +2141,6 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 {
 	DecodedBkpBlock *bkpb;
 	char	   *ptr;
-	PGAlignedBlock tmp;
 
 	if (block_id > record->record->max_block_id ||
 		!record->record->blocks[block_id].in_use)
@@ -2081,63 +2164,7 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
 
 	if (BKPIMAGE_COMPRESSED(bkpb->bimg_info))
 	{
-		/* If a backup block image is compressed, decompress it */
-		bool		decomp_success = true;
-
-		if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0)
-		{
-			if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data,
-								BLCKSZ - bkpb->hole_length, true) < 0)
-				decomp_success = false;
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0)
-		{
-#ifdef USE_LZ4
-			if (LZ4_decompress_safe(ptr, tmp.data,
-									bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0)
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "LZ4",
-								  block_id);
-			return false;
-#endif
-		}
-		else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0)
-		{
-#ifdef USE_ZSTD
-			size_t		decomp_result = ZSTD_decompress(tmp.data,
-														BLCKSZ - bkpb->hole_length,
-														ptr, bkpb->bimg_len);
-
-			if (ZSTD_isError(decomp_result))
-				decomp_success = false;
-#else
-			report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  "zstd",
-								  block_id);
-			return false;
-#endif
-		}
-		else
-		{
-			report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		if (!decomp_success)
-		{
-			report_invalid_record(record, "could not decompress image at %X/%X, block %d",
-								  LSN_FORMAT_ARGS(record->ReadRecPtr),
-								  block_id);
-			return false;
-		}
-
-		ptr = tmp.data;
+		Assert(false);
 	}
 
 	/* generate page, taking into account hole if necessary */
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index c9d8cd796a..a6b2420204 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -5057,7 +5057,7 @@ struct config_enum ConfigureNamesEnum[] =
 			NULL
 		},
 		&wal_compression,
-		WAL_COMPRESSION_NONE, wal_compression_options,
+		WAL_COMPRESSION_ZSTD, wal_compression_options,
 		NULL, NULL, NULL
 	},
 
diff --git a/src/include/access/xloginsert.h b/src/include/access/xloginsert.h
index 71894262fb..0266e224bb 100644
--- a/src/include/access/xloginsert.h
+++ b/src/include/access/xloginsert.h
@@ -44,6 +44,7 @@ extern void XLogBeginInsert(void);
 extern void XLogSetRecordFlags(uint8 flags);
 extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info);
 extern void XLogEnsureRecordSpace(int max_block_id, int ndatas);
+extern void XLogEnsureCompressionBuffer(uint32 extraLen);
 extern void XLogRegisterData(const char *data, uint32 len);
 extern void XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags);
 extern void XLogRegisterBlock(uint8 block_id, RelFileLocator *rlocator,
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 9738462d3c..0cf64ab1b0 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -375,6 +375,8 @@ extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
 /* Forget error produced by XLogReaderValidatePageHeader(). */
 extern void XLogReaderResetError(XLogReaderState *state);
 
+extern uint32 XLogGetRecordTotalLen(XLogRecord *record);
+
 /*
  * Error information from WALRead that both backend and frontend caller can
  * process.  Currently only errors from pg_pread can be reported.
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index a06833ce0a..d113cd6aa4 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -90,6 +90,8 @@ typedef struct XLogRecord
  */
 #define XLR_CHECK_CONSISTENCY	0x02
 
+#define XLR_COMPRESSED	0x04
+
 /*
  * Header info for block data appended to an XLOG record.
  *
@@ -166,17 +168,14 @@ typedef struct XLogRecordBlockImageHeader
 	((info & (BKPIMAGE_COMPRESS_PGLZ | BKPIMAGE_COMPRESS_LZ4 | \
 			  BKPIMAGE_COMPRESS_ZSTD)) != 0)
 
-/*
- * Extra header information used when page image has "hole" and
- * is compressed.
- */
-typedef struct XLogRecordBlockCompressHeader
+/* Record of a compressed header */
+// TODO: replace sizeof(XLogCompressionData) everywhere
+typedef struct XLogCompressionData
 {
-	uint16		hole_length;	/* number of bytes in "hole" */
-} XLogRecordBlockCompressHeader;
-
-#define SizeOfXLogRecordBlockCompressHeader \
-	sizeof(XLogRecordBlockCompressHeader)
+	XLogRecord record_header;
+	char	method;
+	uint32	decompressed_length;
+} XLogCompressionData;
 
 /*
  * Maximum size of the header for a block reference. This is used to size a
@@ -185,7 +184,6 @@ typedef struct XLogRecordBlockCompressHeader
 #define MaxSizeOfXLogRecordBlockHeader \
 	(SizeOfXLogRecordBlockHeader + \
 	 SizeOfXLogRecordBlockImageHeader + \
-	 SizeOfXLogRecordBlockCompressHeader + \
 	 sizeof(RelFileLocator) + \
 	 sizeof(BlockNumber))
 
diff --git a/src/include/pg_config_manual.h b/src/include/pg_config_manual.h
index 449e50bd78..9ff11e3e6a 100644
--- a/src/include/pg_config_manual.h
+++ b/src/include/pg_config_manual.h
@@ -355,7 +355,7 @@
  * Enable debugging print statements for WAL-related operations; see
  * also the wal_debug GUC var.
  */
-/* #define WAL_DEBUG */
+#define WAL_DEBUG
 
 /*
  * Enable tracing of syncscan operations (see also the trace_syncscan GUC var).
diff --git a/src/test/recovery/t/026_overwrite_contrecord.pl b/src/test/recovery/t/026_overwrite_contrecord.pl
index f408d4f69b..942cf26f3f 100644
--- a/src/test/recovery/t/026_overwrite_contrecord.pl
+++ b/src/test/recovery/t/026_overwrite_contrecord.pl
@@ -56,7 +56,7 @@ $$;
 my $initfile = $node->safe_psql('postgres',
 	'SELECT pg_walfile_name(pg_current_wal_insert_lsn())');
 $node->safe_psql('postgres',
-	qq{SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
+	qq{SET wal_compression to off; SELECT pg_logical_emit_message(true, 'test 026', repeat('xyzxz', 123456))}
 );
 #$node->safe_psql('postgres', qq{create table foo ()});
 my $endfile = $node->safe_psql('postgres',
-- 
2.39.5 (Apple Git-154)

