From a6a465cf060e9e0b25ee3389bb57657be260f66b Mon Sep 17 00:00:00 2001
From: Filip Janus <fjanus@redhat.com>
Date: Wed, 16 Apr 2025 12:03:03 +0200
Subject: [PATCH 1/2] This commit adds support for temporary files compression,
 it can be used only for hashjoins now.

It also adds GUC parameter temp_file_compression that enables this functionality.
For now, it supports just lz4 and pglz algorithms. In the future, it
could also be implemented zstd support.

It implements just one working buffer for compression and decompression to avoid
memory wasting. The buffer is allocated in the top memory context.
---
 src/Makefile.global.in                        |   1 +
 src/backend/access/gist/gistbuildbuffers.c    |   2 +-
 src/backend/backup/backup_manifest.c          |   2 +-
 src/backend/executor/nodeHashjoin.c           |   2 +-
 src/backend/storage/file/buffile.c            | 219 +++++++++++++++++-
 src/backend/utils/misc/guc_tables.c           |  24 ++
 src/backend/utils/misc/postgresql.conf.sample |   1 +
 src/backend/utils/sort/logtape.c              |   2 +-
 src/backend/utils/sort/tuplestore.c           |   2 +-
 src/include/storage/buffile.h                 |  13 +-
 10 files changed, 252 insertions(+), 16 deletions(-)

diff --git a/src/Makefile.global.in b/src/Makefile.global.in
index 6722fbdf365..6ff67bda17c 100644
--- a/src/Makefile.global.in
+++ b/src/Makefile.global.in
@@ -201,6 +201,7 @@ with_liburing	= @with_liburing@
 with_libxml	= @with_libxml@
 with_libxslt	= @with_libxslt@
 with_llvm	= @with_llvm@
+with_lz4	= @with_lz4@
 with_system_tzdata = @with_system_tzdata@
 with_uuid	= @with_uuid@
 with_zlib	= @with_zlib@
diff --git a/src/backend/access/gist/gistbuildbuffers.c b/src/backend/access/gist/gistbuildbuffers.c
index 0707254d18e..9cc371f47fe 100644
--- a/src/backend/access/gist/gistbuildbuffers.c
+++ b/src/backend/access/gist/gistbuildbuffers.c
@@ -54,7 +54,7 @@ gistInitBuildBuffers(int pagesPerBuffer, int levelStep, int maxLevel)
 	 * Create a temporary file to hold buffer pages that are swapped out of
 	 * memory.
 	 */
-	gfbb->pfile = BufFileCreateTemp(false);
+	gfbb->pfile = BufFileCreateTemp(false, false);
 	gfbb->nFileBlocks = 0;
 
 	/* Initialize free page management. */
diff --git a/src/backend/backup/backup_manifest.c b/src/backend/backup/backup_manifest.c
index 22e2be37c95..c9f7daa1497 100644
--- a/src/backend/backup/backup_manifest.c
+++ b/src/backend/backup/backup_manifest.c
@@ -65,7 +65,7 @@ InitializeBackupManifest(backup_manifest_info *manifest,
 		manifest->buffile = NULL;
 	else
 	{
-		manifest->buffile = BufFileCreateTemp(false);
+		manifest->buffile = BufFileCreateTemp(false, false);
 		manifest->manifest_ctx = pg_cryptohash_create(PG_SHA256);
 		if (pg_cryptohash_init(manifest->manifest_ctx) < 0)
 			elog(ERROR, "failed to initialize checksum of backup manifest: %s",
diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c
index 5661ad76830..384265ca74a 100644
--- a/src/backend/executor/nodeHashjoin.c
+++ b/src/backend/executor/nodeHashjoin.c
@@ -1434,7 +1434,7 @@ ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
 	{
 		MemoryContext oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
 
-		file = BufFileCreateTemp(false);
+		file = BufFileCreateCompressTemp(false);
 		*fileptr = file;
 
 		MemoryContextSwitchTo(oldctx);
diff --git a/src/backend/storage/file/buffile.c b/src/backend/storage/file/buffile.c
index 366d70d38a1..10da6308004 100644
--- a/src/backend/storage/file/buffile.c
+++ b/src/backend/storage/file/buffile.c
@@ -53,6 +53,18 @@
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "utils/resowner.h"
+#include "utils/memutils.h"
+
+#include "common/pg_lzcompress.h"
+#ifdef USE_LZ4
+#include <lz4.h>
+#endif
+
+#define NO_LZ4_SUPPORT() \
+	ereport(ERROR, \
+			(errcode(ERRCODE_FEATURE_NOT_SUPPORTED), \
+			 errmsg("compression method lz4 not supported"), \
+			 errdetail("This functionality requires the server to be built with lz4 support.")))
 
 /*
  * We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
@@ -62,6 +74,8 @@
 #define MAX_PHYSICAL_FILESIZE	0x40000000
 #define BUFFILE_SEG_SIZE		(MAX_PHYSICAL_FILESIZE / BLCKSZ)
 
+int temp_file_compression = TEMP_NONE_COMPRESSION;
+
 /*
  * This data structure represents a buffered file that consists of one or
  * more physical files (each accessed through a virtual file descriptor
@@ -95,7 +109,8 @@ struct BufFile
 	off_t		curOffset;		/* offset part of current pos */
 	int			pos;			/* next read/write position in buffer */
 	int			nbytes;			/* total # of valid bytes in buffer */
-
+	bool			compress; /* State of usege file compression */
+   char        *cBuffer;
 	/*
 	 * XXX Should ideally use PGIOAlignedBlock, but might need a way to avoid
 	 * wasting per-file alignment padding when some users create many files.
@@ -127,6 +142,8 @@ makeBufFileCommon(int nfiles)
 	file->curOffset = 0;
 	file->pos = 0;
 	file->nbytes = 0;
+	file->compress = false;
+   file->cBuffer = NULL;
 
 	return file;
 }
@@ -188,9 +205,17 @@ extendBufFile(BufFile *file)
  * Note: if interXact is true, the caller had better be calling us in a
  * memory context, and with a resource owner, that will survive across
  * transaction boundaries.
+ *
+ * If compress is true the temporary files will be compressed before
+ * writing on disk.
+ *
+ * Note: The compression does not support random access. Only the hash joins
+ * use it for now. The seek operation other than seek to the beginning of the
+ * buffile will corrupt temporary data offsets.
+ *
  */
 BufFile *
-BufFileCreateTemp(bool interXact)
+BufFileCreateTemp(bool interXact, bool compress)
 {
 	BufFile    *file;
 	File		pfile;
@@ -212,9 +237,47 @@ BufFileCreateTemp(bool interXact)
 	file = makeBufFile(pfile);
 	file->isInterXact = interXact;
 
+	if (temp_file_compression != TEMP_NONE_COMPRESSION)
+	{
+		file->compress = compress;
+	}
+
 	return file;
-}
 
+}
+/*
+ * Wrapper for BuffileCreateTemp
+ * We want to limit the number of memory allocations for the compression buffer,
+ * only one buffer for all compression operations is enough
+ */
+BufFile *
+BufFileCreateCompressTemp(bool interXact){
+   static char * buff = NULL;
+   BufFile *tmpBufFile = BufFileCreateTemp(interXact, true);
+
+   if (buff == NULL && temp_file_compression != TEMP_NONE_COMPRESSION)
+   {
+		int size = 0;
+
+		switch (temp_file_compression)
+		{
+			case TEMP_LZ4_COMPRESSION:
+#ifdef USE_LZ4
+				size = LZ4_compressBound(BLCKSZ)+sizeof(int);
+#endif
+				break;
+			case TEMP_PGLZ_COMPRESSION:
+				size = pglz_maximum_compressed_size(BLCKSZ, BLCKSZ)+sizeof(int);
+				break;
+		}
+		/*
+		 * Persistent buffer for all temporary file compressions
+		 */
+		buff = MemoryContextAlloc(TopMemoryContext, size);
+	}
+	tmpBufFile->cBuffer = buff;
+	return tmpBufFile;
+}
 /*
  * Build the name for a given segment of a given BufFile.
  */
@@ -275,6 +338,7 @@ BufFileCreateFileSet(FileSet *fileset, const char *name)
 	file->files[0] = MakeNewFileSetSegment(file, 0);
 	file->readOnly = false;
 
+
 	return file;
 }
 
@@ -457,11 +521,75 @@ BufFileLoadBuffer(BufFile *file)
 	/*
 	 * Read whatever we can get, up to a full bufferload.
 	 */
-	file->nbytes = FileRead(thisfile,
+	if (!file->compress)
+	{
+
+		/*
+		* Read whatever we can get, up to a full bufferload.
+		*/
+		file->nbytes = FileRead(thisfile,
 							file->buffer.data,
-							sizeof(file->buffer.data),
+							sizeof(file->buffer),
+							file->curOffset,
+							WAIT_EVENT_BUFFILE_READ);
+	/*
+	 * Read and decompress data from the temporary file
+	 * The first reading loads size of the compressed block
+	 * Second reading loads compressed data
+	 */
+	} else {
+		int nread;
+		int nbytes;
+
+		nread = FileRead(thisfile,
+							&nbytes,
+							sizeof(nbytes),
+							file->curOffset,
+							WAIT_EVENT_BUFFILE_READ);
+		/* if not EOF let's continue */
+		if (nread > 0)
+		{
+			/* A long life buffer limits number of memory allocations */
+			char * buff = file->cBuffer;
+
+			Assert(file->cBuffer != NULL);
+			/*
+			 * Read compressed data, curOffset differs with pos
+			 * It reads less data than it returns to caller
+			 * So the curOffset must be advanced here based on compressed size
+			 */
+			file->curOffset+=sizeof(nbytes);
+
+			nread = FileRead(thisfile,
+							buff,
+							nbytes,
 							file->curOffset,
 							WAIT_EVENT_BUFFILE_READ);
+
+			switch (temp_file_compression)
+			{
+				case TEMP_LZ4_COMPRESSION:
+#ifdef USE_LZ4
+					file->nbytes = LZ4_decompress_safe(buff,
+						file->buffer.data,nbytes,sizeof(file->buffer));
+#endif
+					break;
+
+				case TEMP_PGLZ_COMPRESSION:
+					file->nbytes = pglz_decompress(buff,nbytes,
+						file->buffer.data,sizeof(file->buffer),false);
+					break;
+			}
+			file->curOffset += nread;
+
+			if (file->nbytes < 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_DATA_CORRUPTED),
+						 errmsg_internal("compressed lz4 data is corrupt")));
+		}
+
+	}
+
 	if (file->nbytes < 0)
 	{
 		file->nbytes = 0;
@@ -494,9 +622,61 @@ static void
 BufFileDumpBuffer(BufFile *file)
 {
 	int			wpos = 0;
-	int			bytestowrite;
+	int			bytestowrite = 0;
 	File		thisfile;
 
+
+	/* Save nbytes value because the size changes due to compression */
+	int nbytesOriginal = file->nbytes;
+
+	char * DataToWrite;
+	DataToWrite = file->buffer.data;
+
+	/*
+	 * Prepare compressed data to write
+	 * size of compressed block needs to be added at the beggining of the
+	 * compressed data
+	 */
+
+
+	if (file->compress) {
+		char * cData;
+		int cSize = 0;
+
+		Assert(file->cBuffer != NULL);
+		cData = file->cBuffer;
+
+		switch (temp_file_compression)
+			{
+			case TEMP_LZ4_COMPRESSION:
+				{
+#ifdef USE_LZ4
+				int cBufferSize = LZ4_compressBound(file->nbytes);
+				/*
+				* Using stream compression would lead to the slight improvement in
+				* compression ratio
+				*/
+				cSize = LZ4_compress_default(file->buffer.data,
+					cData + sizeof(int),file->nbytes, cBufferSize);
+#endif
+				break;
+				}
+			case TEMP_PGLZ_COMPRESSION:
+				cSize = pglz_compress(file->buffer.data,file->nbytes,
+					cData + sizeof(int),PGLZ_strategy_always);
+				break;
+			}
+
+
+		/* Write size of compressed block in front of compressed data
+		 * It's used to determine amount of data to read within
+		 * decompression process
+		 */
+		memcpy(cData,&cSize,sizeof(int));
+		file->nbytes=cSize + sizeof(int);
+		DataToWrite = cData;
+	}
+
 	/*
 	 * Unlike BufFileLoadBuffer, we must dump the whole buffer even if it
 	 * crosses a component-file boundary; so we need a loop.
@@ -535,7 +715,7 @@ BufFileDumpBuffer(BufFile *file)
 			INSTR_TIME_SET_ZERO(io_start);
 
 		bytestowrite = FileWrite(thisfile,
-								 file->buffer.data + wpos,
+								 DataToWrite + wpos,
 								 bytestowrite,
 								 file->curOffset,
 								 WAIT_EVENT_BUFFILE_WRITE);
@@ -564,7 +744,19 @@ BufFileDumpBuffer(BufFile *file)
 	 * logical file position, ie, original value + pos, in case that is less
 	 * (as could happen due to a small backwards seek in a dirty buffer!)
 	 */
-	file->curOffset -= (file->nbytes - file->pos);
+
+
+	if (!file->compress)
+		file->curOffset -= (file->nbytes - file->pos);
+	else
+		if (nbytesOriginal - file->pos != 0)
+			/* curOffset must be corrected also if compression is
+			 * enabled, nbytes was changed by compression but we
+			 * have to use the original value of nbytes
+			 */
+			file->curOffset-=bytestowrite;
+
+
 	if (file->curOffset < 0)	/* handle possible segment crossing */
 	{
 		file->curFile--;
@@ -577,6 +769,7 @@ BufFileDumpBuffer(BufFile *file)
 	 */
 	file->pos = 0;
 	file->nbytes = 0;
+
 }
 
 /*
@@ -602,8 +795,14 @@ BufFileReadCommon(BufFile *file, void *ptr, size_t size, bool exact, bool eofOK)
 	{
 		if (file->pos >= file->nbytes)
 		{
-			/* Try to load more data into buffer. */
-			file->curOffset += file->pos;
+			/* Try to load more data into buffer.
+			 *
+			 * curOffset is moved within BufFileLoadBuffer
+			 * because stored data size differs from loaded/
+			 * decompressed size
+			 * */
+			if (!file->compress)
+				file->curOffset += file->pos;
 			file->pos = 0;
 			file->nbytes = 0;
 			BufFileLoadBuffer(file);
diff --git a/src/backend/utils/misc/guc_tables.c b/src/backend/utils/misc/guc_tables.c
index 60b12446a1c..ae052640ac0 100644
--- a/src/backend/utils/misc/guc_tables.c
+++ b/src/backend/utils/misc/guc_tables.c
@@ -78,6 +78,7 @@
 #include "replication/syncrep.h"
 #include "storage/aio.h"
 #include "storage/bufmgr.h"
+#include "storage/buffile.h"
 #include "storage/bufpage.h"
 #include "storage/copydir.h"
 #include "storage/io_worker.h"
@@ -463,6 +464,18 @@ static const struct config_enum_entry default_toast_compression_options[] = {
 #endif
 	{NULL, 0, false}
 };
+/*
+ * pglz and zstd support should be added as future enhancement
+ *
+ */
+static const struct config_enum_entry temp_file_compression_options[] = {
+	{"no", TEMP_NONE_COMPRESSION, false},
+	{"pglz", TEMP_PGLZ_COMPRESSION, false},
+#ifdef  USE_LZ4
+	{"lz4", TEMP_LZ4_COMPRESSION, false},
+#endif
+	{NULL, 0, false}
+};
 
 static const struct config_enum_entry wal_compression_options[] = {
 	{"pglz", WAL_COMPRESSION_PGLZ, false},
@@ -5058,6 +5071,17 @@ struct config_enum ConfigureNamesEnum[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"temp_file_compression", PGC_USERSET, CLIENT_CONN_STATEMENT,
+			gettext_noop("Sets the default compression method for compressible values."),
+			NULL
+		},
+		&temp_file_compression,
+		TEMP_NONE_COMPRESSION,
+		temp_file_compression_options,
+		NULL, NULL, NULL
+	},
+
 	{
 		{"default_transaction_isolation", PGC_USERSET, CLIENT_CONN_STATEMENT,
 			gettext_noop("Sets the transaction isolation level of each new transaction."),
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index 34826d01380..77961a45d65 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -182,6 +182,7 @@
 
 #max_notify_queue_pages = 1048576	# limits the number of SLRU pages allocated
 					# for NOTIFY / LISTEN queue
+#temp_file_compression = 'no'	# enables temporary files compression
 
 #file_copy_method = copy	# the default is the first option
 					# 	copy
diff --git a/src/backend/utils/sort/logtape.c b/src/backend/utils/sort/logtape.c
index e529ceb8260..d862e22ef18 100644
--- a/src/backend/utils/sort/logtape.c
+++ b/src/backend/utils/sort/logtape.c
@@ -592,7 +592,7 @@ LogicalTapeSetCreate(bool preallocate, SharedFileSet *fileset, int worker)
 		lts->pfile = BufFileCreateFileSet(&fileset->fs, filename);
 	}
 	else
-		lts->pfile = BufFileCreateTemp(false);
+		lts->pfile = BufFileCreateTemp(false, false);
 
 	return lts;
 }
diff --git a/src/backend/utils/sort/tuplestore.c b/src/backend/utils/sort/tuplestore.c
index c9aecab8d66..ef85924cd21 100644
--- a/src/backend/utils/sort/tuplestore.c
+++ b/src/backend/utils/sort/tuplestore.c
@@ -860,7 +860,7 @@ tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
 			 */
 			oldcxt = MemoryContextSwitchTo(state->context->parent);
 
-			state->myfile = BufFileCreateTemp(state->interXact);
+			state->myfile = BufFileCreateTemp(state->interXact, false);
 
 			MemoryContextSwitchTo(oldcxt);
 
diff --git a/src/include/storage/buffile.h b/src/include/storage/buffile.h
index a2f4821f240..931a211038b 100644
--- a/src/include/storage/buffile.h
+++ b/src/include/storage/buffile.h
@@ -32,11 +32,22 @@
 
 typedef struct BufFile BufFile;
 
+typedef enum
+{
+	TEMP_NONE_COMPRESSION,
+	TEMP_PGLZ_COMPRESSION,
+	TEMP_LZ4_COMPRESSION
+} TempCompression;
+
+extern PGDLLIMPORT int temp_file_compression;
+
+
 /*
  * prototypes for functions in buffile.c
  */
 
-extern BufFile *BufFileCreateTemp(bool interXact);
+extern BufFile *BufFileCreateCompressTemp(bool interXact);
+extern BufFile *BufFileCreateTemp(bool interXact, bool compress);
 extern void BufFileClose(BufFile *file);
 pg_nodiscard extern size_t BufFileRead(BufFile *file, void *ptr, size_t size);
 extern void BufFileReadExact(BufFile *file, void *ptr, size_t size);
-- 
2.39.5 (Apple Git-154)

