------- Original Message -------
On Wednesday, March 29th, 2023 at 12:02 AM, Tomas Vondra 
<tomas.von...@enterprisedb.com> wrote:


> 
> 
> On 3/28/23 18:07, gkokola...@pm.me wrote:
> 
> > ------- Original Message -------
> > On Friday, March 24th, 2023 at 10:30 AM, gkokola...@pm.me gkokola...@pm.me 
> > wrote:
> > 
> > > ------- Original Message -------
> > > On Thursday, March 23rd, 2023 at 6:10 PM, Tomas Vondra 
> > > tomas.von...@enterprisedb.com wrote:
> > > 
> > > > This leaves the empty-data issue (which we have a fix for) and the
> > > > switch to LZ4F. And then the zstd part.
> > > 
> > > Please expect promptly a patch for the switch to frames.
> > 
> > Please find the expected patch attached. Note that the bulk of the
> > patch is code unification, variable renaming to something more
> > appropriate, and comment addition. These are changes that are not
> > strictly necessary to switch to LZ4F. I do believe that are
> > essential for code hygiene after the switch and they do belong
> > on the same commit.
> 
> 
> I think the patch is fine, but I'm wondering if the renames shouldn't go
> a bit further. It removes references to LZ4File struct, but there's a
> bunch of functions with LZ4File_ prefix. Why not to simply use LZ4_
> prefix? We don't have GzipFile either.
> 
> Sure, it might be a bit confusing because lz4.h uses LZ4_ prefix, but
> then we probably should not define LZ4_compressor_init ...

This is a good point. The initial thought was that since lz4.h is now
removed, such ambiguity will not be present. In v2 of the patch the
function is renamed to `LZ4State_compression_init` since this name
describes better its purpose. It initializes the LZ4State for
compression.

As for the LZ4File_ prefix, I have no objections. Please find the
prefix changed to LZ4Stream_. For the record, the word 'File' is not
unique to the lz4 implementation. The common data structure used by
the API in compress_io.h:

   typedef struct CompressFileHandle CompressFileHandle; 

The public functions for this API are named:

  InitCompressFileHandle
  InitDiscoverCompressFileHandle
  EndCompressFileHandle

And within InitCompressFileHandle the pattern is:

    if (compression_spec.algorithm == PG_COMPRESSION_NONE)
        InitCompressFileHandleNone(CFH, compression_spec);
    else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
        InitCompressFileHandleGzip(CFH, compression_spec);
    else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
        InitCompressFileHandleLZ4(CFH, compression_spec);

It was felt that a prefix was required due to the inclusion 'lz4.h'
header where naming functions as 'LZ4_' would be wrong. The prefix
'LZ4File_' seemed to be in line with the naming of the rest of
the relevant functions and structures. Other compressions, gzip and
none, did not face the same issue.

To conclude, I think that having a prefix is slightly preferred
over not having one. Since the prefix `LZ4File_` is not desired,
I propose `LZ4Stream_` in v2.

I will not object to dismissing the argument and drop `File` from
the prefix, if so requested.

> 
> Also, maybe the comments shouldn't use "File API" when compress_io.c
> calls that "Compressed stream API".

Done.

Cheers,
//Georgios

> 
> 
> regards
> 
> --
> Tomas Vondra
> EnterpriseDB: http://www.enterprisedb.com
> The Enterprise PostgreSQL Company
From b17b60cc1ff608f85c6c75ab19ad40c0863cfa93 Mon Sep 17 00:00:00 2001
From: Georgios Kokolatos <gkokola...@pm.me>
Date: Fri, 31 Mar 2023 09:16:52 +0000
Subject: [PATCH v2] Use LZ4 frames in pg_dump's compressor API.

This change allows for greater compaction of data, especially so in very narrow
relations, by avoiding at least a compaction header and footer per row. Since
LZ4 frames are now used by both compression APIs, some code deduplication
opportunities have become obvious and are also implemented.

While at it, rename LZ4File* functions to LZ4Stream* to improve readability.

Reported by: Justin Pryzby
---
 src/bin/pg_dump/compress_lz4.c | 420 +++++++++++++++++++++------------
 1 file changed, 275 insertions(+), 145 deletions(-)

diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index fc2f4e116d..7023b11a2c 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -17,7 +17,6 @@
 #include "compress_lz4.h"
 
 #ifdef USE_LZ4
-#include <lz4.h>
 #include <lz4frame.h>
 
 /*
@@ -29,133 +28,286 @@
 #endif
 
 /*----------------------
- * Compressor API
- *----------------------
+ * Common to both APIs
  */
 
-typedef struct LZ4CompressorState
+/*
+ * State used for LZ4 (de)compression by both APIs.
+ */
+typedef struct LZ4State
 {
-	char	   *outbuf;
-	size_t		outsize;
-} LZ4CompressorState;
+	/*
+	 * Used by the Stream API to keep track of the file stream.
+	 */
+	FILE	   *fp;
+
+	LZ4F_preferences_t prefs;
+
+	LZ4F_compressionContext_t	ctx;
+	LZ4F_decompressionContext_t	dtx;
+
+	/*
+	 * Used by the Stream API's lazy initialization.
+	 */
+	bool		inited;
+
+	/*
+	 * Used by the Stream API to distinguish between compression
+	 * and decompression operations.
+	 */
+	bool		compressing;
+
+	/*
+	 * Used by the Compressor API to mark if the compression
+	 * headers have been written after initialization.
+	 */
+	bool		needs_header_flush;
+
+	size_t		buflen;
+	char	   *buffer;
+
+	/*
+	 * Used by the Stream API to store already uncompressed
+	 * data that the caller has not consumed.
+	 */
+	size_t		overflowalloclen;
+	size_t		overflowlen;
+	char	   *overflowbuf;
+
+	/*
+	 * Used by both APIs to keep track of the compressed
+	 * data length stored in the buffer.
+	 */
+	size_t		compressedlen;
+
+	/*
+	 * Used by both APIs to keep track of error codes.
+	 */
+	size_t		errcode;
+} LZ4State;
+
+/*
+ * Initialize the required LZ4State members for compression. Write the LZ4 frame
+ * header in a buffer keeping track of its length. Users of this function can
+ * choose when and how to write the header to a file stream.
+ *
+ * Returns true on success. In case of a failure returns false, and stores the
+ * error code in state->errcode.
+ */
+static bool
+LZ4State_compression_init(LZ4State *state)
+{
+	size_t		status;
+
+	state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
+
+	/*
+	 * LZ4F_compressBegin requires a buffer that is greater or equal to
+	 * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
+	 */
+	if (state->buflen < LZ4F_HEADER_SIZE_MAX)
+		state->buflen = LZ4F_HEADER_SIZE_MAX;
+
+	status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
+	if (LZ4F_isError(status))
+	{
+		state->errcode = status;
+		return false;
+	}
+
+	state->buffer = pg_malloc(state->buflen);
+	status = LZ4F_compressBegin(state->ctx,
+								state->buffer, state->buflen,
+							   &state->prefs);
+	if (LZ4F_isError(status))
+	{
+		state->errcode = status;
+		return false;
+	}
+
+	state->compressedlen = status;
+
+	return true;
+}
+
+/*----------------------
+ * Compressor API
+ *----------------------
+ */
 
 /* Private routines that support LZ4 compressed data I/O */
-static void ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs);
-static void WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
-								  const void *data, size_t dLen);
-static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs);
 
 static void
 ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
 {
-	LZ4_streamDecode_t lz4StreamDecode;
-	char	   *buf;
-	char	   *decbuf;
-	size_t		buflen;
-	size_t		cnt;
-
-	buflen = DEFAULT_IO_BUFFER_SIZE;
-	buf = pg_malloc(buflen);
-	decbuf = pg_malloc(buflen);
+	size_t		r;
+	size_t		readbuflen;
+	char	   *outbuf;
+	char	   *readbuf;
+	LZ4F_decompressionContext_t ctx = NULL;
+	LZ4F_decompressOptions_t	dec_opt;
+	LZ4F_errorCode_t			status;
+
+	memset(&dec_opt, 0, sizeof(dec_opt));
+	status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
+	if (LZ4F_isError(status))
+		pg_fatal("could not create LZ4 decompression context: %s",
+				 LZ4F_getErrorName(status));
+
+	outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
+	readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
+	readbuflen = DEFAULT_IO_BUFFER_SIZE;
+	while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
+	{
+		char	   *readp;
+		char	   *readend;
 
-	LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0);
+		/* Process one chunk */
+		readp = readbuf;
+		readend = readbuf + r;
+		while (readp < readend)
+		{
+			size_t		out_size = DEFAULT_IO_BUFFER_SIZE;
+			size_t		read_size = readend - readp;
 
-	while ((cnt = cs->readF(AH, &buf, &buflen)))
-	{
-		int			decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode,
-															buf, decbuf,
-															cnt, buflen);
+			memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
+			status = LZ4F_decompress(ctx, outbuf, &out_size,
+									 readp, &read_size, &dec_opt);
+			if (LZ4F_isError(status))
+				pg_fatal("could not decompress: %s",
+						 LZ4F_getErrorName(status));
 
-		ahwrite(decbuf, 1, decBytes, AH);
+			ahwrite(outbuf, 1, out_size, AH);
+			readp += read_size;
+		}
 	}
 
-	pg_free(buf);
-	pg_free(decbuf);
+	pg_free(outbuf);
+	pg_free(readbuf);
+
+	status = LZ4F_freeDecompressionContext(ctx);
+	if (LZ4F_isError(status))
+		pg_fatal("could not free LZ4 decompression context: %s",
+				 LZ4F_getErrorName(status));
 }
 
 static void
 WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
 					  const void *data, size_t dLen)
 {
-	LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private_data;
-	size_t		compressed;
-	size_t		requiredsize = LZ4_compressBound(dLen);
+	LZ4State	   *state = (LZ4State *) cs->private_data;
+	size_t			remaining = dLen;
+	size_t			status;
+	size_t			chunk;
 
-	if (requiredsize > LZ4cs->outsize)
+	/* Write the header if not yet written. */
+	if (state->needs_header_flush)
 	{
-		LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize);
-		LZ4cs->outsize = requiredsize;
+		cs->writeF(AH, state->buffer, state->compressedlen);
+		state->needs_header_flush = false;
 	}
 
-	compressed = LZ4_compress_default(data, LZ4cs->outbuf,
-									  dLen, LZ4cs->outsize);
+	while (remaining > 0)
+	{
+
+		if (remaining > DEFAULT_IO_BUFFER_SIZE)
+			chunk = DEFAULT_IO_BUFFER_SIZE;
+		else
+			chunk = remaining;
+
+		remaining -= chunk;
+		status = LZ4F_compressUpdate(state->ctx,
+									 state->buffer, state->buflen,
+									 data, chunk, NULL);
+
+		if (LZ4F_isError(status))
+			pg_fatal("failed to LZ4 compress data: %s",
+					 LZ4F_getErrorName(status));
 
-	if (compressed <= 0)
-		pg_fatal("failed to LZ4 compress data");
+		cs->writeF(AH, state->buffer, status);
 
-	cs->writeF(AH, LZ4cs->outbuf, compressed);
+		data = ((char *) data) + chunk;
+	}
 }
 
 static void
 EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
 {
-	LZ4CompressorState *LZ4cs;
-
-	LZ4cs = (LZ4CompressorState *) cs->private_data;
-	if (LZ4cs)
-	{
-		pg_free(LZ4cs->outbuf);
-		pg_free(LZ4cs);
-		cs->private_data = NULL;
-	}
+	LZ4State	   *state = (LZ4State *) cs->private_data;
+	size_t			status;
+
+	/* Nothing needs to be done */
+	if (!state)
+		return;
+
+	/*
+	 * Write the header if not yet written. The caller is not required to
+	 * call writeData if the relation does not contain any data. Thus it is
+	 * possible to reach here without having flushed the header. Do it before
+	 * ending the compression.
+	 */
+	if (state->needs_header_flush)
+		cs->writeF(AH, state->buffer, state->compressedlen);
+
+	status = LZ4F_compressEnd(state->ctx,
+							  state->buffer, state->buflen,
+							  NULL);
+	if (LZ4F_isError(status))
+		pg_fatal("failed to end compression: %s",
+				 LZ4F_getErrorName(status));
+
+	cs->writeF(AH, state->buffer, status);
+
+	status = LZ4F_freeCompressionContext(state->ctx);
+	if (LZ4F_isError(status))
+		pg_fatal("failed to end compression: %s",
+				 LZ4F_getErrorName(status));
+
+	pg_free(state->buffer);
+	pg_free(state);
+
+	cs->private_data = NULL;
 }
 
-
 /*
  * Public routines that support LZ4 compressed data I/O
  */
 void
 InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
 {
+	LZ4State	   *state;
+
 	cs->readData = ReadDataFromArchiveLZ4;
 	cs->writeData = WriteDataToArchiveLZ4;
 	cs->end = EndCompressorLZ4;
 
 	cs->compression_spec = compression_spec;
 
-	/* Will be lazy init'd */
-	cs->private_data = pg_malloc0(sizeof(LZ4CompressorState));
+	/*
+	 * Read operations have access to the whole input. No state needs
+	 * to be carried between calls.
+	 */
+	if (cs->readF)
+		return;
+
+	state = pg_malloc0(sizeof(*state));
+	if (cs->compression_spec.level >= 0)
+		state->prefs.compressionLevel = cs->compression_spec.level;
+
+	if (!LZ4State_compression_init(state))
+		pg_fatal("could not initialize LZ4 compression: %s",
+				 LZ4F_getErrorName(state->errcode));
+
+	/* Remember that the header has not been written. */
+	state->needs_header_flush = true;
+	cs->private_data = state;
 }
 
 /*----------------------
- * Compress File API
+ * Compress Stream API
  *----------------------
  */
 
-/*
- * State needed for LZ4 (de)compression using the CompressFileHandle API.
- */
-typedef struct LZ4File
-{
-	FILE	   *fp;
-
-	LZ4F_preferences_t prefs;
-
-	LZ4F_compressionContext_t ctx;
-	LZ4F_decompressionContext_t dtx;
-
-	bool		inited;
-	bool		compressing;
-
-	size_t		buflen;
-	char	   *buffer;
-
-	size_t		overflowalloclen;
-	size_t		overflowlen;
-	char	   *overflowbuf;
-
-	size_t		errcode;
-} LZ4File;
 
 /*
  * LZ4 equivalent to feof() or gzeof().  Return true iff there is no
@@ -163,17 +315,17 @@ typedef struct LZ4File
  * is reached.
  */
 static bool
-LZ4File_eof(CompressFileHandle *CFH)
+LZ4Stream_eof(CompressFileHandle *CFH)
 {
-	LZ4File    *fs = (LZ4File *) CFH->private_data;
+	LZ4State    *fs = (LZ4State *) CFH->private_data;
 
 	return fs->overflowlen == 0 && feof(fs->fp);
 }
 
 static const char *
-LZ4File_get_error(CompressFileHandle *CFH)
+LZ4Stream_get_error(CompressFileHandle *CFH)
 {
-	LZ4File    *fs = (LZ4File *) CFH->private_data;
+	LZ4State    *fs = (LZ4State *) CFH->private_data;
 	const char *errmsg;
 
 	if (LZ4F_isError(fs->errcode))
@@ -185,7 +337,7 @@ LZ4File_get_error(CompressFileHandle *CFH)
 }
 
 /*
- * Prepare an already alloc'ed LZ4File struct for subsequent calls (either
+ * Prepare an already alloc'ed LZ4State struct for subsequent calls (either
  * compression or decompression).
  *
  * It creates the necessary contexts for the operations. When compressing data
@@ -196,7 +348,7 @@ LZ4File_get_error(CompressFileHandle *CFH)
  * error code in fs->errcode.
  */
 static bool
-LZ4File_init(LZ4File *fs, int size, bool compressing)
+LZ4Stream_init(LZ4State *fs, int size, bool compressing)
 {
 	size_t		status;
 
@@ -209,33 +361,11 @@ LZ4File_init(LZ4File *fs, int size, bool compressing)
 	/* When compressing, write LZ4 header to the output stream. */
 	if (fs->compressing)
 	{
-		fs->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &fs->prefs);
 
-		/*
-		 * LZ4F_compressBegin requires a buffer that is greater or equal to
-		 * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
-		 */
-		if (fs->buflen < LZ4F_HEADER_SIZE_MAX)
-			fs->buflen = LZ4F_HEADER_SIZE_MAX;
-
-		status = LZ4F_createCompressionContext(&fs->ctx, LZ4F_VERSION);
-		if (LZ4F_isError(status))
-		{
-			fs->errcode = status;
-			return false;
-		}
-
-		fs->buffer = pg_malloc(fs->buflen);
-		status = LZ4F_compressBegin(fs->ctx, fs->buffer, fs->buflen,
-									&fs->prefs);
-
-		if (LZ4F_isError(status))
-		{
-			fs->errcode = status;
+		if (!LZ4State_compression_init(fs))
 			return false;
-		}
 
-		if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+		if (fwrite(fs->buffer, 1, fs->compressedlen, fs->fp) != fs->compressedlen)
 		{
 			errno = (errno) ? errno : ENOSPC;
 			return false;
@@ -272,7 +402,7 @@ LZ4File_init(LZ4File *fs, int size, bool compressing)
  * the 'ptr' buffer), or 0 if the overflow buffer is empty.
  */
 static int
-LZ4File_read_overflow(LZ4File *fs, void *ptr, int size, bool eol_flag)
+LZ4Stream_read_overflow(LZ4State *fs, void *ptr, int size, bool eol_flag)
 {
 	char	   *p;
 	int			readlen = 0;
@@ -306,7 +436,7 @@ LZ4File_read_overflow(LZ4File *fs, void *ptr, int size, bool eol_flag)
  * char if found first when the eol_flag is set. It is possible that the
  * decompressed output generated by reading any compressed input via the
  * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
- * at an overflow buffer within LZ4File. Of course, when the function is
+ * at an overflow buffer within LZ4State. Of course, when the function is
  * called, it will first try to consume any decompressed content already
  * present in the overflow buffer, before decompressing new content.
  *
@@ -314,7 +444,7 @@ LZ4File_read_overflow(LZ4File *fs, void *ptr, int size, bool eol_flag)
  * buffer, or -1 in case of error.
  */
 static int
-LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
+LZ4Stream_read_internal(LZ4State *fs, void *ptr, int ptrsize, bool eol_flag)
 {
 	int			dsize = 0;
 	int			rsize;
@@ -324,7 +454,7 @@ LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
 	void	   *readbuf;
 
 	/* Lazy init */
-	if (!LZ4File_init(fs, size, false /* decompressing */ ))
+	if (!LZ4Stream_init(fs, size, false /* decompressing */ ))
 		return -1;
 
 	/* Verify that there is enough space in the outbuf */
@@ -335,7 +465,7 @@ LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
 	}
 
 	/* use already decompressed content if available */
-	dsize = LZ4File_read_overflow(fs, ptr, size, eol_flag);
+	dsize = LZ4Stream_read_overflow(fs, ptr, size, eol_flag);
 	if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
 		return dsize;
 
@@ -423,14 +553,14 @@ LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
  * Compress size bytes from ptr and write them to the stream.
  */
 static bool
-LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH)
+LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 {
-	LZ4File    *fs = (LZ4File *) CFH->private_data;
+	LZ4State    *fs = (LZ4State *) CFH->private_data;
 	size_t		status;
 	int			remaining = size;
 
 	/* Lazy init */
-	if (!LZ4File_init(fs, size, true))
+	if (!LZ4Stream_init(fs, size, true))
 		return false;
 
 	while (remaining > 0)
@@ -461,13 +591,13 @@ LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH)
  * fread() equivalent implementation for LZ4 compressed files.
  */
 static bool
-LZ4File_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
+LZ4Stream_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
 {
-	LZ4File    *fs = (LZ4File *) CFH->private_data;
+	LZ4State    *fs = (LZ4State *) CFH->private_data;
 	int			ret;
 
-	if ((ret = LZ4File_read_internal(fs, ptr, size, false)) < 0)
-		pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+	if ((ret = LZ4Stream_read_internal(fs, ptr, size, false)) < 0)
+		pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
 
 	if (rsize)
 		*rsize = (size_t) ret;
@@ -479,15 +609,15 @@ LZ4File_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
  * fgetc() equivalent implementation for LZ4 compressed files.
  */
 static int
-LZ4File_getc(CompressFileHandle *CFH)
+LZ4Stream_getc(CompressFileHandle *CFH)
 {
-	LZ4File    *fs = (LZ4File *) CFH->private_data;
+	LZ4State    *fs = (LZ4State *) CFH->private_data;
 	unsigned char c;
 
-	if (LZ4File_read_internal(fs, &c, 1, false) <= 0)
+	if (LZ4Stream_read_internal(fs, &c, 1, false) <= 0)
 	{
-		if (!LZ4File_eof(CFH))
-			pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+		if (!LZ4Stream_eof(CFH))
+			pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
 		else
 			pg_fatal("could not read from input file: end of file");
 	}
@@ -499,14 +629,14 @@ LZ4File_getc(CompressFileHandle *CFH)
  * fgets() equivalent implementation for LZ4 compressed files.
  */
 static char *
-LZ4File_gets(char *ptr, int size, CompressFileHandle *CFH)
+LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
 {
-	LZ4File    *fs = (LZ4File *) CFH->private_data;
+	LZ4State    *fs = (LZ4State *) CFH->private_data;
 	int			ret;
 
-	ret = LZ4File_read_internal(fs, ptr, size, true);
-	if (ret < 0 || (ret == 0 && !LZ4File_eof(CFH)))
-		pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+	ret = LZ4Stream_read_internal(fs, ptr, size, true);
+	if (ret < 0 || (ret == 0 && !LZ4Stream_eof(CFH)))
+		pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
 
 	/* Done reading */
 	if (ret == 0)
@@ -520,10 +650,10 @@ LZ4File_gets(char *ptr, int size, CompressFileHandle *CFH)
  * remaining content and/or generated footer from the LZ4 API.
  */
 static bool
-LZ4File_close(CompressFileHandle *CFH)
+LZ4Stream_close(CompressFileHandle *CFH)
 {
 	FILE	   *fp;
-	LZ4File    *fs = (LZ4File *) CFH->private_data;
+	LZ4State    *fs = (LZ4State *) CFH->private_data;
 	size_t		status;
 
 	fp = fs->fp;
@@ -564,11 +694,11 @@ LZ4File_close(CompressFileHandle *CFH)
 }
 
 static bool
-LZ4File_open(const char *path, int fd, const char *mode,
+LZ4Stream_open(const char *path, int fd, const char *mode,
 			 CompressFileHandle *CFH)
 {
 	FILE	   *fp;
-	LZ4File    *lz4fp = (LZ4File *) CFH->private_data;
+	LZ4State    *lz4fp = (LZ4State *) CFH->private_data;
 
 	if (fd >= 0)
 		fp = fdopen(fd, mode);
@@ -586,7 +716,7 @@ LZ4File_open(const char *path, int fd, const char *mode,
 }
 
 static bool
-LZ4File_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
+LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
 {
 	char	   *fname;
 	int			save_errno;
@@ -609,17 +739,17 @@ void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
 						  const pg_compress_specification compression_spec)
 {
-	LZ4File    *lz4fp;
-
-	CFH->open_func = LZ4File_open;
-	CFH->open_write_func = LZ4File_open_write;
-	CFH->read_func = LZ4File_read;
-	CFH->write_func = LZ4File_write;
-	CFH->gets_func = LZ4File_gets;
-	CFH->getc_func = LZ4File_getc;
-	CFH->eof_func = LZ4File_eof;
-	CFH->close_func = LZ4File_close;
-	CFH->get_error_func = LZ4File_get_error;
+	LZ4State    *lz4fp;
+
+	CFH->open_func = LZ4Stream_open;
+	CFH->open_write_func = LZ4Stream_open_write;
+	CFH->read_func = LZ4Stream_read;
+	CFH->write_func = LZ4Stream_write;
+	CFH->gets_func = LZ4Stream_gets;
+	CFH->getc_func = LZ4Stream_getc;
+	CFH->eof_func = LZ4Stream_eof;
+	CFH->close_func = LZ4Stream_close;
+	CFH->get_error_func = LZ4Stream_get_error;
 
 	CFH->compression_spec = compression_spec;
 	lz4fp = pg_malloc0(sizeof(*lz4fp));
-- 
2.34.1

Reply via email to