From 068d2aba4d2fdff18627133653c784913146554b Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekewurm+postgres@gmail.com>
Date: Tue, 24 Jan 2023 12:37:09 +0100
Subject: [PATCH v1] Reduce overhead of small block data in xlog records

We reserve 2 bits in the block ID header for how large the block's registered
data is: 0, 1 or 2 bytes, for 0, 1-255 and 256-UINT16_MAX bytes to the block,
respectively. This reduces the minimum size of any block-referencing xlog
record from 48 bytes to 46 bytes, and reduces the size of any multi-page heap
update record by at least 2 bytes.

It includes more infrastructure for future value-dependent encoding of
length fields, even though it is currently only used in one place.
---
 src/backend/access/transam/xloginsert.c |  13 ++-
 src/backend/access/transam/xlogreader.c |  34 +++++--
 src/include/access/xlog_internal.h      | 114 ++++++++++++++++++++++++
 src/include/access/xlogrecord.h         |  43 +++++++--
 4 files changed, 188 insertions(+), 16 deletions(-)

diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index ea7e2f67af..3ed7cadd58 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -581,6 +581,8 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		XLogRecordBlockHeader bkpb;
 		XLogRecordBlockImageHeader bimg;
 		XLogRecordBlockCompressHeader cbimg = {0};
+		int			data_length = 0;
+		XLogSizeClass data_sizeclass = XLS_EMPTY;
 		bool		samerel;
 		bool		is_compressed = false;
 		bool		include_image;
@@ -622,7 +624,7 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 
 		bkpb.id = block_id;
 		bkpb.fork_flags = regbuf->forkno;
-		bkpb.data_length = 0;
+		data_length = 0;
 
 		if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
 			bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
@@ -785,13 +787,16 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 			 * overall list.
 			 */
 			bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
-			bkpb.data_length = (uint16) regbuf->rdata_len;
+			data_length = (uint16) regbuf->rdata_len;
+			data_sizeclass = XLogLengthToSizeClass(data_length, XLS_UINT16);
 			total_len += regbuf->rdata_len;
 
 			rdt_datas_last->next = regbuf->rdata_head;
 			rdt_datas_last = regbuf->rdata_tail;
 		}
 
+		bkpb.id |= data_sizeclass << XLR_BLOCKID_SZCLASS_SHIFT;
+
 		if (prev_regbuf && RelFileLocatorEquals(regbuf->rlocator, prev_regbuf->rlocator))
 		{
 			samerel = true;
@@ -804,6 +809,10 @@ XLogRecordAssemble(RmgrId rmid, uint8 info,
 		/* Ok, copy the header to the scratch buffer */
 		memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
 		scratch += SizeOfXLogRecordBlockHeader;
+
+		scratch += XLogWriteLength(data_length, data_sizeclass,
+								   XLS_UINT16, scratch);
+
 		if (include_image)
 		{
 			memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 631f260f79..58e96faaa9 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1660,11 +1660,11 @@ DecodeXLogRecord(XLogReaderState *state,
 	 */
 #define COPY_HEADER_FIELD(_dst, _size)			\
 	do {										\
-		if (remaining < _size)					\
+		if (remaining < (_size))				\
 			goto shortdata_err;					\
-		memcpy(_dst, ptr, _size);				\
-		ptr += _size;							\
-		remaining -= _size;						\
+		memcpy((_dst), ptr, (_size));			\
+		ptr += (_size);							\
+		remaining -= (_size);					\
 	} while(0)
 
 	char	   *ptr;
@@ -1690,8 +1690,13 @@ DecodeXLogRecord(XLogReaderState *state,
 	datatotal = 0;
 	while (remaining > datatotal)
 	{
+		XLogSizeClass sizeClass;
 		COPY_HEADER_FIELD(&block_id, sizeof(uint8));
 
+		sizeClass = (block_id & XLR_BLOCKID_SZCLASS_MASK) >>
+														  XLR_BLOCKID_SZCLASS_SHIFT;
+		block_id &= XLR_BLOCK_ID_MASK;
+
 		if (block_id == XLR_BLOCK_ID_DATA_SHORT)
 		{
 			/* XLogRecordDataHeaderShort */
@@ -1755,7 +1760,26 @@ DecodeXLogRecord(XLogReaderState *state,
 
 			blk->prefetch_buffer = InvalidBuffer;
 
-			COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
+			{
+				Size		read;
+				uint32		length = 0;
+				read = XLogReadLength(&length, sizeClass,
+									  XLS_UINT16,
+									  ptr, remaining);
+
+				if (read < 0)
+				{
+					report_invalid_record(state,
+										  "Could not read length from record at %X/%X",
+										  LSN_FORMAT_ARGS(state->ReadRecPtr));
+					goto err;
+				}
+
+				ptr += read;
+				remaining -= read;
+				blk->data_len = length;
+			}
+
 			/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
 			if (blk->has_data && blk->data_len == 0)
 			{
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index b0fd338a00..42b3c66547 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -328,6 +328,120 @@ typedef enum
 struct LogicalDecodingContext;
 struct XLogRecordBuffer;
 
+/*
+ * XLogSizeClass interactions
+ */
+
+/*
+ * XLogLengthToSizeClass
+ * Turns a length into a size class
+ * MaxSizeClass is used to allow the compiler to eliminate branches that
+ * can never be hit in the caller's code path; e.g. when processing uint16.
+ */
+static inline XLogSizeClass XLogLengthToSizeClass(uint32 length,
+												  const XLogSizeClass maxSizeClass)
+{
+	XLogSizeClass sizeClass;
+
+	if (length == 0)
+		sizeClass = XLS_EMPTY;
+	else if (length <= UINT8_MAX  && maxSizeClass >= XLS_UINT8)
+		sizeClass = XLS_UINT8;
+	else if (length <= UINT16_MAX && maxSizeClass >= XLS_UINT16)
+		sizeClass = XLS_UINT16;
+	else
+	{
+		Assert(maxSizeClass == XLS_UINT32);
+		sizeClass = XLS_UINT32;
+	}
+
+	Assert(sizeClass <= maxSizeClass);
+
+	return sizeClass;
+}
+
+/*
+ * XLogWriteLength
+ *
+ * Write a length with size determined by sizeClass into the output.
+ * Returns the Size of bytes written. The user is responsible for making
+ * sure that out-of-bounds write is impossible.
+ * Writes at most 4 bytes.
+ */
+static inline Size XLogWriteLength(uint32 length, XLogSizeClass sizeClass,
+								   const XLogSizeClass maxSizeClass,
+								   char *output)
+{
+	Size written = -1;
+
+	Assert(sizeClass <= maxSizeClass &&
+		   XLogLengthToSizeClass(length, maxSizeClass) == sizeClass);
+
+#define WRITE_OP(caseSizeClass, field_type) \
+		case caseSizeClass: \
+			if ((caseSizeClass) <= maxSizeClass) \
+			{ \
+				field_type typedLength = length; \
+				memcpy(output, &typedLength, sizeof(field_type)); \
+				written = sizeof(field_type); \
+			} \
+			break
+
+	switch (sizeClass) {
+		case XLS_EMPTY:
+			written = 0;
+			break;
+		WRITE_OP(XLS_UINT8, uint8);
+		WRITE_OP(XLS_UINT16, uint16);
+		WRITE_OP(XLS_UINT32, uint32);
+	}
+
+#undef WRITE_OP
+	return written;
+}
+
+/*
+ * XLogReadLength
+ *
+ * Read a length with size determined by sizeClass from the input into
+ * *length. Returns the Size of bytes read, or -1 if the input size was
+ * too small to read the relevant length field.
+ */
+static inline Size XLogReadLength(uint32 *length, XLogSizeClass sizeClass,
+								  const XLogSizeClass maxSizeClass,
+								  char *input, Size inSize)
+{
+	Size readSize = -1;
+
+	Assert(sizeClass <= maxSizeClass);
+
+#define READ_OP(caseSizeClass, field_type) \
+		case caseSizeClass: \
+			if ((caseSizeClass) <= maxSizeClass) \
+			{ \
+				field_type typedLength; \
+				if (inSize < sizeof(field_type)) \
+					return -1; \
+				memcpy(&typedLength, input, sizeof(field_type)); \
+				readSize = sizeof(field_type); \
+				*length = typedLength; \
+			} \
+			break
+
+	switch (sizeClass) {
+		case XLS_EMPTY:
+			readSize = 0;
+			*length = 0;
+			break;
+		READ_OP(XLS_UINT8, uint8);
+		READ_OP(XLS_UINT16, uint16);
+		READ_OP(XLS_UINT32, uint32);
+	}
+
+#undef READ_OP
+	return readSize;
+}
+
 /*
  * Method table for resource managers.
  *
diff --git a/src/include/access/xlogrecord.h b/src/include/access/xlogrecord.h
index f355e08e1d..b0aada1031 100644
--- a/src/include/access/xlogrecord.h
+++ b/src/include/access/xlogrecord.h
@@ -17,6 +17,27 @@
 #include "storage/block.h"
 #include "storage/relfilelocator.h"
 
+/*
+ * XLogSizeClass
+ *
+ * XLog data segments registered to e.g. blocks are often quite small.
+ * This XLogSizeClass infrastructure allows us to do variable length encoding
+ * on these fields, so that we may save some bytes in each record on length
+ * fields.
+ *
+ * Note that it is possible to encode smaller sizes in the larger size classes,
+ * but the reverse is not true. For maximum efficiency, it is important to only
+ * use 
+ * 
+ * Values > UINT32_MAX are not supported in this scheme.
+ */
+typedef enum XLogSizeClass {
+	XLS_EMPTY = 0,	/* length == 0 */
+	XLS_UINT8 = 1,		/* length <= UINT8_MAX; stored in uint8 (1B) */
+	XLS_UINT16 = 2,		/* length <= UINT16_MAX; stored in uint16 (2B) */
+	XLS_UINT32 = 3		/* length <= UINT32_MAX; stored in uint32 (4B) */
+} XLogSizeClass;
+
 /*
  * The overall layout of an XLOG record is:
  *		Fixed-size header (XLogRecord struct)
@@ -104,15 +125,14 @@ typedef struct XLogRecordBlockHeader
 {
 	uint8		id;				/* block reference ID */
 	uint8		fork_flags;		/* fork within the relation, and flags */
-	uint16		data_length;	/* number of payload bytes (not including page
-								 * image) */
 
+	/* Depending on XLR_BLOCKID_SZCLASS; 0, 1 or 2 bytes of data_length follow */
 	/* If BKPBLOCK_HAS_IMAGE, an XLogRecordBlockImageHeader struct follows */
 	/* If BKPBLOCK_SAME_REL is not set, a RelFileLocator follows */
 	/* BlockNumber follows */
 } XLogRecordBlockHeader;
 
-#define SizeOfXLogRecordBlockHeader (offsetof(XLogRecordBlockHeader, data_length) + sizeof(uint16))
+#define SizeOfXLogRecordBlockHeader (offsetof(XLogRecordBlockHeader, fork_flags) + sizeof(uint8))
 
 /*
  * Additional header information when a full-page image is included
@@ -226,6 +246,11 @@ typedef struct XLogRecordDataHeaderLong
 
 #define SizeOfXLogRecordDataHeaderLong (sizeof(uint8) + sizeof(uint32))
 
+#define XLR_BLOCK_ID_MASK			0x3F
+
+#define XLR_BLOCKID_SZCLASS_SHIFT	6
+#define XLR_BLOCKID_SZCLASS_MASK	(3 << XLR_BLOCKID_SZCLASS_SHIFT)
+
 /*
  * Block IDs used to distinguish different kinds of record fragments. Block
  * references are numbered from 0 to XLR_MAX_BLOCK_ID. A rmgr is free to use
@@ -234,15 +259,15 @@ typedef struct XLogRecordDataHeaderLong
  * numbers are reserved to denote the "main" data portion of the record,
  * as well as replication-supporting transaction metadata.
  *
- * The maximum is currently set at 32, quite arbitrarily. Most records only
+ * The maximum is currently set at 0x20 (32), quite arbitrarily. Most records only
  * need a handful of block references, but there are a few exceptions that
  * need more.
  */
-#define XLR_MAX_BLOCK_ID			32
+#define XLR_MAX_BLOCK_ID			0x20
 
-#define XLR_BLOCK_ID_DATA_SHORT		255
-#define XLR_BLOCK_ID_DATA_LONG		254
-#define XLR_BLOCK_ID_ORIGIN			253
-#define XLR_BLOCK_ID_TOPLEVEL_XID	252
+#define XLR_BLOCK_ID_DATA_SHORT		0x3F
+#define XLR_BLOCK_ID_DATA_LONG		0x3E
+#define XLR_BLOCK_ID_ORIGIN			0x3D
+#define XLR_BLOCK_ID_TOPLEVEL_XID	0x3C
 
 #endif							/* XLOGRECORD_H */
-- 
2.39.0

