From 6d907203daf20c4923c3c744d25f8ff4c42802de Mon Sep 17 00:00:00 2001
From: Julien Tachoires <julmon@gmail.com>
Date: Thu, 6 Jun 2024 00:57:38 -0700
Subject: [PATCH] Compress ReorderBuffer spill files using LZ4
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

When the content of a large transaction (size exceeding
logical_decoding_work_mem) and its sub-transactions has to be
reordered during logical decoding, then, all the changes are written
on disk in temporary files located in pg_replslot/<slot_name>.
Decoding very large transactions by multiple replication slots can
lead to disk space saturation and high I/O utilization.

When compiled with LZ4 support (--with-lz4), this patch enables data
compression/decompression of these temporary files. Each transaction
change that must be written on disk (ReorderBufferDiskChange) is now
compressed and encapsulated in a new structure.

3 different compression strategies are implemented:

1. LZ4 streaming compression is the preferred one and works
   efficiently for small individual changes.
2. LZ4 regular compression when the changes are too large for using
   the streaming API.
3. No compression when compression fails, the change is then stored
   not compressed.

When not using compression, the following case generates 1590MB of
spill files:

  CREATE TABLE t (i INTEGER PRIMARY KEY, t TEXT);
  INSERT INTO t
    SELECT i, 'Hello number n°'||i::TEXT
    FROM generate_series(1, 10000000) as i;

With LZ4 compression, it creates 653MB of spill files: 58.9% less
disk space usage.
---
 .../replication/logical/reorderbuffer.c       | 526 ++++++++++++++++--
 src/include/replication/reorderbuffer.h       |  22 +
 2 files changed, 496 insertions(+), 52 deletions(-)

diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 00a8327e77..8ac216a9c8 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -88,6 +88,9 @@
 
 #include <unistd.h>
 #include <sys/stat.h>
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
 
 #include "access/detoast.h"
 #include "access/heapam.h"
@@ -181,6 +184,24 @@ typedef struct ReorderBufferDiskChange
 	/* data follows */
 } ReorderBufferDiskChange;
 
+#ifdef USE_LZ4
+/* Possible reorder buffer ondisk strategies */
+typedef enum ReorderBufferCompressStrat
+{
+	REORDER_BUFFER_NO_COMPRESSION,
+	REORDER_BUFFER_LZ4_STREAMING,
+	REORDER_BUFFER_LZ4,
+} ReorderBufferCompressStrat;
+
+typedef struct ReorderBufferCompressDiskChange
+{
+	ReorderBufferCompressStrat strat;	/* Ondisk compression strategy */
+	Size		size;					/* Ondisk data size */
+	Size		orig_size;				/* Original data size */
+	/* data follows */
+} ReorderBufferCompressDiskChange;
+#endif
+
 #define IsSpecInsert(action) \
 ( \
 	((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
@@ -255,6 +276,13 @@ static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *tx
 										 int fd, ReorderBufferChange *change);
 static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 										TXNEntryFile *file, XLogSegNo *segno);
+#ifdef USE_LZ4
+static bool LZ4_ReadOndiskBufferChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
+									   TXNEntryFile *file, XLogSegNo *segno);
+#else
+static bool ReadOndiskBufferChange(ReorderBuffer *rb, TXNEntryFile *file,
+								   XLogSegNo *segno);
+#endif
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 									   char *data);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
@@ -428,6 +456,20 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
 	txn->command_id = InvalidCommandId;
 	txn->output_plugin_private = NULL;
 
+#ifdef USE_LZ4
+	/*
+	 * We do not allocate LZ4 resources at this point because we have no
+	 * guarantee that we will need them later. Let's allocate only when we
+	 * are about to use them.
+	 */
+	txn->lz4_in_buf = NULL;
+	txn->lz4_out_buf = NULL;
+	txn->lz4_in_buf_offset = 0;
+	txn->lz4_out_buf_offset = 0;
+	txn->lz4_stream = NULL;
+	txn->lz4_stream_decode = NULL;
+#endif
+
 	return txn;
 }
 
@@ -464,6 +506,31 @@ ReorderBufferReturnTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 		txn->invalidations = NULL;
 	}
 
+#ifdef USE_LZ4
+	if (txn->lz4_in_buf != NULL)
+	{
+		MemoryContext oldcontext = MemoryContextSwitchTo(rb->context);
+
+		pfree(txn->lz4_in_buf);
+		LZ4_freeStream(txn->lz4_stream);
+		txn->lz4_in_buf = NULL;
+		txn->lz4_stream = NULL;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+	if (txn->lz4_out_buf != NULL)
+	{
+		MemoryContext oldcontext = MemoryContextSwitchTo(rb->context);
+
+		pfree(txn->lz4_out_buf);
+		LZ4_freeStreamDecode(txn->lz4_stream_decode);
+		txn->lz4_out_buf = NULL;
+		txn->lz4_stream_decode = NULL;
+
+		MemoryContextSwitchTo(oldcontext);
+	}
+#endif
+
 	/* Reset the toast hash */
 	ReorderBufferToastReset(rb, txn);
 
@@ -3778,6 +3845,15 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 {
 	ReorderBufferDiskChange *ondisk;
 	Size		sz = sizeof(ReorderBufferDiskChange);
+#ifdef USE_LZ4
+	char	   *buf;				/* LZ4/plain buffer */
+	Size		buf_size;			/* LZ4/plain buffer size */
+	char	   *writePtr;
+	Size		write_size;
+	int			lz4_cmp_size = 0;	/* compressed size */
+	ReorderBufferCompressDiskChange *cmp_ondisk;
+	char	   *lz4_in_bufPtr = NULL;
+#endif
 
 	ReorderBufferSerializeReserve(rb, sz);
 
@@ -3957,7 +4033,167 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	errno = 0;
 	pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
+
+#ifdef USE_LZ4
+	/*
+	 * Use LZ4 streaming compression iff we can keep at least 2 plain changes
+	 * into the LZ4 input ring buffer. If plain data size is too large, let's
+	 * use regular LZ4 compression.
+	 */
+	if (sz < (LZ4_RING_BUFFER_SIZE / 2))
+	{
+
+		/*
+		 * Allocate LZ4 resources in ReorderBuffer's memory context.
+		 */
+		if (txn->lz4_in_buf == NULL)
+		{
+			MemoryContext oldcontext = MemoryContextSwitchTo(rb->context);
+
+			txn->lz4_in_buf = (char *) palloc0(LZ4_RING_BUFFER_SIZE);
+			txn->lz4_stream = LZ4_createStream();
+
+			MemoryContextSwitchTo(oldcontext);
+		}
+
+		/* Ring buffer offset wraparound */
+		if ((txn->lz4_in_buf_offset + sz) > LZ4_RING_BUFFER_SIZE)
+			txn->lz4_in_buf_offset = 0;
+
+		/* Get the pointer of the current entry in the ring buffer */
+		lz4_in_bufPtr = txn->lz4_in_buf + txn->lz4_in_buf_offset;
+
+		/* Copy data that should be compressed into LZ4 input ring buffer */
+		memcpy(lz4_in_bufPtr, rb->outbuf, sz);
+
+		/*
+		 * Allocate space for storing the compressed content of the reorder
+		 * buffer output buffer. What we need to write on disk is formed by a
+		 * ReorderBufferCompressDiskChange structure followed by compressed
+		 * data.
+		 */
+		buf_size = LZ4_COMPRESSBOUND(sz) + sizeof(ReorderBufferCompressDiskChange);
+		buf = (char *) palloc0(buf_size);
+
+		/* Use LZ4 streaming compression API */
+		lz4_cmp_size = LZ4_compress_fast_continue(txn->lz4_stream,
+												  lz4_in_bufPtr,
+												  buf + sizeof(ReorderBufferCompressDiskChange),
+												  sz,
+												  buf_size - sizeof(ReorderBufferCompressDiskChange),
+												  1);
+
+		if (lz4_cmp_size > 0)
+		{
+			/* Move the input ring buffer offset */
+			txn->lz4_in_buf_offset += sz;
+
+			cmp_ondisk = (ReorderBufferCompressDiskChange *) buf;
+			cmp_ondisk->strat = REORDER_BUFFER_LZ4_STREAMING;
+			/* Store the original data size (before compression) */
+			cmp_ondisk->orig_size = sz;
+			/*
+			 * Store the ondisk size: compressed size + size of
+			 * ReorderBufferCompressDiskChange.
+			 */
+			cmp_ondisk->size = (Size) lz4_cmp_size + sizeof(ReorderBufferCompressDiskChange);
+
+			/*
+			 * Pointing write pointer and size to buf. buf contains
+			 * ReorderBufferCompressDiskChange followed by compressed data.
+			 */
+			writePtr = buf;
+			write_size = cmp_ondisk->size;
+		}
+		else
+		{
+			/*
+			 * LZ4 streaming compression failed, let's store the change not
+			 * compressed.
+			 */
+			cmp_ondisk = (ReorderBufferCompressDiskChange *) buf;
+			cmp_ondisk->strat = REORDER_BUFFER_NO_COMPRESSION;
+			cmp_ondisk->orig_size = sz;
+			cmp_ondisk->size = sz + sizeof(ReorderBufferCompressDiskChange);
+
+			/*
+			 * Write ReorderBufferCompressDiskChange only, later we will write
+			 * the reorder buffer output content.
+			 */
+			if (write(fd, buf, sizeof(ReorderBufferCompressDiskChange)) != sizeof(ReorderBufferCompressDiskChange))
+			{
+				int			save_errno = errno;
+
+				CloseTransientFile(fd);
+
+				/* if write didn't set errno, assume problem is no disk space */
+				errno = save_errno ? save_errno : ENOSPC;
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not write to data file for XID %u: %m",
+								txn->xid)));
+			}
+
+			/* Pointing write pointer and size to the reorder buffer output */
+			writePtr = rb->outbuf;
+			write_size = sz;
+		}
+	}
+	else
+	/* Regular LZ4 compression */
+	{
+		buf_size = LZ4_COMPRESSBOUND(sz) + sizeof(ReorderBufferCompressDiskChange);
+		buf = (char *) palloc0(buf_size);
+
+		/* Use LZ4 regular compression API */
+		lz4_cmp_size = LZ4_compress_default(rb->outbuf,
+											buf + sizeof(ReorderBufferCompressDiskChange),
+											sz,
+											buf_size - sizeof(ReorderBufferCompressDiskChange));
+
+		if (lz4_cmp_size > 0)
+		{
+			cmp_ondisk = (ReorderBufferCompressDiskChange *) buf;
+			cmp_ondisk->strat = REORDER_BUFFER_LZ4;
+			cmp_ondisk->orig_size = sz;
+			cmp_ondisk->size = (Size) lz4_cmp_size + sizeof(ReorderBufferCompressDiskChange);
+
+			writePtr = buf;
+			write_size = cmp_ondisk->size;
+		}
+		else
+		{
+			/*
+			 * LZ4 regular compression failed, let's store the change not
+			 * compressed.
+			 */
+			cmp_ondisk = (ReorderBufferCompressDiskChange *) buf;
+			cmp_ondisk->strat = REORDER_BUFFER_NO_COMPRESSION;
+			cmp_ondisk->orig_size = sz;
+			cmp_ondisk->size = sz + sizeof(ReorderBufferCompressDiskChange);
+
+			if (write(fd, buf, sizeof(ReorderBufferCompressDiskChange)) != sizeof(ReorderBufferCompressDiskChange))
+			{
+				int			save_errno = errno;
+
+				CloseTransientFile(fd);
+
+				errno = save_errno ? save_errno : ENOSPC;
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not write to data file for XID %u: %m",
+								txn->xid)));
+			}
+
+			writePtr = rb->outbuf;
+			write_size = sz;
+		}
+	}
+
+	if (write(fd, writePtr, write_size) != write_size)
+#else
 	if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
+#endif
 	{
 		int			save_errno = errno;
 
@@ -3984,6 +4220,10 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		txn->final_lsn = change->lsn;
 
 	Assert(ondisk->change.action == change->action);
+
+#ifdef USE_LZ4
+	pfree(buf);
+#endif
 }
 
 /* Returns true, if the output plugin supports streaming, false, otherwise. */
@@ -4252,9 +4492,6 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
 	while (restored < max_changes_in_memory && *segno <= last_segno)
 	{
-		int			readBytes;
-		ReorderBufferDiskChange *ondisk;
-
 		CHECK_FOR_INTERRUPTS();
 
 		if (*fd == -1)
@@ -4293,60 +4530,19 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 		}
 
 		/*
-		 * Read the statically sized part of a change which has information
-		 * about the total size. If we couldn't read a record, we're at the
-		 * end of this file.
+		 * Read the full change from disk.
+		 * If ReadOndiskBufferChange returns false, then we are at the eof, so,
+		 * move the next segment.
 		 */
-		ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
-		readBytes = FileRead(file->vfd, rb->outbuf,
-							 sizeof(ReorderBufferDiskChange),
-							 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
-
-		/* eof */
-		if (readBytes == 0)
+#ifdef USE_LZ4
+		if (!LZ4_ReadOndiskBufferChange(rb, txn, file, segno))
+#else
+		if (!ReadOndiskBufferChange(rb, file, segno))
+#endif
 		{
-			FileClose(*fd);
 			*fd = -1;
-			(*segno)++;
 			continue;
 		}
-		else if (readBytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from reorderbuffer spill file: %m")));
-		else if (readBytes != sizeof(ReorderBufferDiskChange))
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
-							readBytes,
-							(uint32) sizeof(ReorderBufferDiskChange))));
-
-		file->curOffset += readBytes;
-
-		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
-		ReorderBufferSerializeReserve(rb,
-									  sizeof(ReorderBufferDiskChange) + ondisk->size);
-		ondisk = (ReorderBufferDiskChange *) rb->outbuf;
-
-		readBytes = FileRead(file->vfd,
-							 rb->outbuf + sizeof(ReorderBufferDiskChange),
-							 ondisk->size - sizeof(ReorderBufferDiskChange),
-							 file->curOffset,
-							 WAIT_EVENT_REORDER_BUFFER_READ);
-
-		if (readBytes < 0)
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from reorderbuffer spill file: %m")));
-		else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
-			ereport(ERROR,
-					(errcode_for_file_access(),
-					 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
-							readBytes,
-							(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
-
-		file->curOffset += readBytes;
 
 		/*
 		 * ok, read a full change from disk, now restore it into proper
@@ -4359,6 +4555,232 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
 	return restored;
 }
 
+#ifdef USE_LZ4
+static bool
+LZ4_ReadOndiskBufferChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
+						   TXNEntryFile *file, XLogSegNo *segno)
+{
+	int			readBytes;
+	ReorderBufferCompressDiskChange *cmp_ondisk;
+	char	   *cmp_header;		/* compressed data header */
+	char	   *cmp_data;		/* compressed data */
+	int			decBytes;		/* decompressed data size */
+	char	   *lz4_out_bufPtr;	/* LZ4 ring buffer entry pointer */
+
+	/*
+	 * Read the statically sized part of a change which has information about
+	 * the total size and compression method. If we couldn't read a record,
+	 * we're at the end of this file.
+	 */
+	cmp_header = (char *) palloc0(sizeof(ReorderBufferCompressDiskChange));
+	readBytes = FileRead(file->vfd, cmp_header,
+						 sizeof(ReorderBufferCompressDiskChange),
+						 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
+
+	/* eof */
+	if (readBytes == 0)
+	{
+
+		FileClose(file->vfd);
+		(*segno)++;
+		pfree(cmp_header);
+
+		return false;
+	}
+	else if (readBytes < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from reorderbuffer spill file: %m")));
+	else if (readBytes != sizeof(ReorderBufferCompressDiskChange))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
+						readBytes,
+						(uint32) sizeof(ReorderBufferCompressDiskChange))));
+
+	file->curOffset += readBytes;
+
+	cmp_ondisk = (ReorderBufferCompressDiskChange *) cmp_header;
+
+	/* Read ondisk data */
+	cmp_data = (char *) palloc0(cmp_ondisk->size - sizeof(ReorderBufferCompressDiskChange));
+	readBytes = FileRead(file->vfd,
+						 cmp_data,
+						 cmp_ondisk->size - sizeof(ReorderBufferCompressDiskChange),
+						 file->curOffset,
+						 WAIT_EVENT_REORDER_BUFFER_READ);
+
+	if (readBytes < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from reorderbuffer spill file: %m")));
+	else if (readBytes != cmp_ondisk->size - sizeof(ReorderBufferCompressDiskChange))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
+						readBytes,
+						(uint32) (cmp_ondisk->size - sizeof(ReorderBufferCompressDiskChange)))));
+
+	switch (cmp_ondisk->strat)
+	{
+		case REORDER_BUFFER_NO_COMPRESSION:
+			/*
+			 * No compression: make a copy of what was read on disk into the
+			 * reorder buffer.
+			 */
+			ReorderBufferSerializeReserve(rb, cmp_ondisk->orig_size);
+
+			memcpy(rb->outbuf, cmp_data, cmp_ondisk->orig_size);
+			break;
+		case REORDER_BUFFER_LZ4:
+			/* LZ4 regular decompression */
+
+			/*
+			 * Make sure the output reorder buffer has enough space to store
+			 * decompressed data.
+			 */
+			ReorderBufferSerializeReserve(rb, cmp_ondisk->orig_size);
+
+			decBytes = LZ4_decompress_safe(cmp_data,
+										   rb->outbuf,
+										   cmp_ondisk->size - sizeof(ReorderBufferCompressDiskChange),
+										   cmp_ondisk->orig_size);
+
+			Assert(decBytes == cmp_ondisk->orig_size);
+
+			if (decBytes < 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("compressed LZ4 data is corrupt")));
+			else if (decBytes != cmp_ondisk->orig_size)
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("decompressed LZ4 data size differs from original size")));
+			break;
+		case REORDER_BUFFER_LZ4_STREAMING:
+			/* LZ4 streaming decompression */
+			/*
+			 * Allocate LZ4 resources in ReorderBuffer's memory context.
+			 */
+			if (txn->lz4_out_buf == NULL)
+			{
+				MemoryContext oldcontext = MemoryContextSwitchTo(rb->context);
+
+				txn->lz4_out_buf = (char *) palloc0(LZ4_RING_BUFFER_SIZE);
+				txn->lz4_stream_decode = LZ4_createStreamDecode();
+
+				MemoryContextSwitchTo(oldcontext);
+			}
+
+			/* Ring buffer offset wraparound */
+			if ((txn->lz4_out_buf_offset + cmp_ondisk->orig_size) > LZ4_RING_BUFFER_SIZE)
+				txn->lz4_out_buf_offset = 0;
+
+			/* Get the pointer of the current entry in the ring buffer */
+			lz4_out_bufPtr = txn->lz4_out_buf + txn->lz4_out_buf_offset;
+
+			decBytes = LZ4_decompress_safe_continue(txn->lz4_stream_decode,
+													cmp_data,
+													lz4_out_bufPtr,
+													cmp_ondisk->size - sizeof(ReorderBufferCompressDiskChange),
+													cmp_ondisk->orig_size);
+
+			Assert(decBytes == cmp_ondisk->orig_size);
+
+			if (decBytes < 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("compressed LZ4 data is corrupt")));
+			else if (decBytes != cmp_ondisk->orig_size)
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("decompressed LZ4 data size differs from original size")));
+			/*
+			 * Make sure the output reorder buffer has enough space to store
+			 * decompressed data.
+			 */
+			ReorderBufferSerializeReserve(rb, cmp_ondisk->orig_size);
+
+			memcpy(rb->outbuf, lz4_out_bufPtr, decBytes);
+
+			/* Move the output ring buffer offset */
+			txn->lz4_out_buf_offset += decBytes;
+			break;
+	}
+
+	pfree(cmp_data);
+	pfree(cmp_header);
+
+	file->curOffset += readBytes;
+
+	return true;
+}
+#else
+static bool
+ReadOndiskBufferChange(ReorderBuffer *rb, TXNEntryFile *file, XLogSegNo *segno)
+{
+	int			readBytes;
+	ReorderBufferDiskChange *ondisk;
+
+	/*
+	 * Read the statically sized part of a change which has information about
+	 * the total size. If we couldn't read a record, we're at the end of this
+	 * file.
+	 */
+	ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
+	readBytes = FileRead(file->vfd, rb->outbuf,
+						 sizeof(ReorderBufferDiskChange),
+						 file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
+
+	/* eof */
+	if (readBytes == 0)
+	{
+		FileClose(file->vfd);
+		(*segno)++;
+		return false;
+	}
+	else if (readBytes < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from reorderbuffer spill file: %m")));
+	else if (readBytes != sizeof(ReorderBufferDiskChange))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
+						readBytes,
+						(uint32) sizeof(ReorderBufferDiskChange))));
+
+	file->curOffset += readBytes;
+
+	ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+	ReorderBufferSerializeReserve(rb,
+								  sizeof(ReorderBufferDiskChange) + ondisk->size);
+	ondisk = (ReorderBufferDiskChange *) rb->outbuf;
+
+	readBytes = FileRead(file->vfd,
+						 rb->outbuf + sizeof(ReorderBufferDiskChange),
+						 ondisk->size - sizeof(ReorderBufferDiskChange),
+						 file->curOffset,
+						 WAIT_EVENT_REORDER_BUFFER_READ);
+
+	if (readBytes < 0)
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from reorderbuffer spill file: %m")));
+	else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
+		ereport(ERROR,
+				(errcode_for_file_access(),
+				 errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
+						readBytes,
+						(uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
+
+	file->curOffset += readBytes;
+
+	return true;
+}
+#endif
+
 /*
  * Convert change from its on-disk format to in-memory format and queue it onto
  * the TXN's ->changes list.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 851a001c8b..2ee35e5f7a 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -9,6 +9,10 @@
 #ifndef REORDERBUFFER_H
 #define REORDERBUFFER_H
 
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
 #include "access/htup_details.h"
 #include "lib/ilist.h"
 #include "lib/pairingheap.h"
@@ -422,8 +426,26 @@ typedef struct ReorderBufferTXN
 	 * Private data pointer of the output plugin.
 	 */
 	void	   *output_plugin_private;
+#ifdef USE_LZ4
+	LZ4_stream_t *lz4_stream;
+	LZ4_streamDecode_t *lz4_stream_decode;
+	/* LZ4 in/out ring buffers used for streaming compression */
+	char	   *lz4_in_buf;
+	int			lz4_in_buf_offset;
+	char	   *lz4_out_buf;
+	int			lz4_out_buf_offset;
+#endif
 } ReorderBufferTXN;
 
+#ifdef USE_LZ4
+/*
+ * We use a fairly small LZ4 ring buffer size (64kB). Using a larger buffer
+ * size provide better compression ratio, but as long as we have to allocate
+ * two LZ4 ring buffers per ReorderBufferTXN created, we should keep it small.
+ */
+#define LZ4_RING_BUFFER_SIZE (64 * 1024)
+#endif
+
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
 typedef struct ReorderBuffer ReorderBuffer;
 
-- 
2.43.0

