On Sat, Feb 25, 2023 at 08:05:53AM -0600, Justin Pryzby wrote:
> On Fri, Feb 24, 2023 at 11:02:14PM -0600, Justin Pryzby wrote:
> > I have some fixes (attached) and questions while polishing the patch for
> > zstd compression.  The fixes are small and could be integrated with the
> > patch for zstd, but could be applied independently.
> 
> One more - WriteDataToArchiveGzip() says:

One more again.

The LZ4 path is using non-streaming mode, which compresses each block
without persistent state, giving poor compression for -Fc compared with
-Fp.  If the data is highly compressible, the difference can be orders
of magnitude.

$ ./src/bin/pg_dump/pg_dump -h /tmp postgres -Z lz4 -Fp |wc -c
12351763
$ ./src/bin/pg_dump/pg_dump -h /tmp postgres -Z lz4 -Fc |wc -c
21890708

That's not true for gzip:

$ ./src/bin/pg_dump/pg_dump -h /tmp postgres -t t1 -Z gzip -Fc |wc -c
2118869
$ ./src/bin/pg_dump/pg_dump -h /tmp postgres -t t1 -Z gzip -Fp |wc -c
2115832

The function ought to at least use streaming mode, so each block/row
isn't compressioned in isolation.  003 is a simple patch to use
streaming mode, which improves the -Fc case:

$ ./src/bin/pg_dump/pg_dump -h /tmp postgres -Z lz4 -Fc |wc -c
15178283

However, that still flushes the compression buffer, writing a block
header, for every row.  With a single-column table, pg_dump -Fc -Z lz4
still outputs ~10% *more* data than with no compression at all.  And
that's for compressible data.

$ ./src/bin/pg_dump/pg_dump -h /tmp postgres -t t1 -Fc -Z lz4 |wc -c
12890296
$ ./src/bin/pg_dump/pg_dump -h /tmp postgres -t t1 -Fc -Z none |wc -c
11890296

I think this should use the LZ4F API with frames, which are buffered to
avoid outputting a header for every single row.  The LZ4F format isn't
compatible with the LZ4 format, so (unlike changing to the streaming
API) that's not something we can change in a bugfix release.  I consider
this an Opened Item.

With the LZ4F API in 004, -Fp and -Fc are essentially the same size
(like gzip).  (Oh, and the output is three times smaller, too.)

$ ./src/bin/pg_dump/pg_dump -h /tmp postgres -t t1 -Z lz4 -Fp |wc -c
4155448
$ ./src/bin/pg_dump/pg_dump -h /tmp postgres -t t1 -Z lz4 -Fc |wc -c
4156548

-- 
Justin
>From 3a980f956bf314fb161fbf0a76f62ed0c2c35bfe Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Fri, 24 Feb 2023 14:07:09 -0600
Subject: [PATCH 1/4] f!fixes for LZ4

---
 src/bin/pg_dump/compress_gzip.c |  8 --------
 src/bin/pg_dump/compress_io.h   |  2 +-
 src/bin/pg_dump/compress_lz4.c  | 12 ++++--------
 src/bin/pg_dump/pg_dump.c       |  2 --
 4 files changed, 5 insertions(+), 19 deletions(-)

diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
index 0af65afeb4e..52f41c2e58c 100644
--- a/src/bin/pg_dump/compress_gzip.c
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -123,17 +123,9 @@ WriteDataToArchiveGzip(ArchiveHandle *AH, CompressorState *cs,
 		gzipcs->outbuf = pg_malloc(ZLIB_OUT_SIZE + 1);
 		gzipcs->outsize = ZLIB_OUT_SIZE;
 
-		/*
-		 * A level of zero simply copies the input one block at the time. This
-		 * is probably not what the user wanted when calling this interface.
-		 */
-		if (cs->compression_spec.level == 0)
-			pg_fatal("requested to compress the archive yet no level was specified");
-
 		if (deflateInit(zp, cs->compression_spec.level) != Z_OK)
 			pg_fatal("could not initialize compression library: %s", zp->msg);
 
-		/* Just be paranoid - maybe End is called after Start, with no Write */
 		zp->next_out = gzipcs->outbuf;
 		zp->avail_out = gzipcs->outsize;
 	}
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index bbde2693915..cdb15951ea9 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -172,7 +172,7 @@ struct CompressFileHandle
 extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec);
 
 /*
- * Initialize a compress file stream. Deffer the compression algorithm
+ * Initialize a compress file stream. Infer the compression algorithm
  * from 'path', either by examining its suffix or by appending the supported
  * suffixes in 'path'.
  */
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index fe1014e6e77..7dacfeae469 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -105,12 +105,8 @@ EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
 	LZ4CompressorState *LZ4cs;
 
 	LZ4cs = (LZ4CompressorState *) cs->private_data;
-	if (LZ4cs)
-	{
-		pg_free(LZ4cs->outbuf);
-		pg_free(LZ4cs);
-		cs->private_data = NULL;
-	}
+	pg_free(LZ4cs->outbuf);
+	pg_free(LZ4cs);
 }
 
 
@@ -161,8 +157,8 @@ typedef struct LZ4File
 } LZ4File;
 
 /*
- * LZ4 equivalent to feof() or gzeof(). The end of file is reached if there
- * is no decompressed output in the overflow buffer and the end of the file
+ * LZ4 equivalent to feof() or gzeof().  Return true iff there is no
+ * decompressed output in the overflow buffer and the end of the backing file
  * is reached.
  */
 static int
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 24ba936332d..ce2242195f3 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -733,8 +733,6 @@ main(int argc, char **argv)
 #ifdef HAVE_LIBZ
 		parse_compress_specification(PG_COMPRESSION_GZIP, NULL,
 									 &compression_spec);
-#else
-		/* Nothing to do in the default case */
 #endif
 	}
 
-- 
2.34.1

>From 8e846443a7a3e5e2df37fabc917ae3964d6d1500 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sun, 26 Feb 2023 17:47:09 -0600
Subject: [PATCH 2/4] f!fixes for LZ4 which also conflict with the ZSTD patch

---
 doc/src/sgml/ref/pg_dump.sgml    | 4 ++--
 src/bin/pg_dump/t/002_pg_dump.pl | 6 +++---
 2 files changed, 5 insertions(+), 5 deletions(-)

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 49d218905fb..6fbe49f7ede 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -331,7 +331,7 @@ PostgreSQL documentation
            can read. A directory format archive can be manipulated with
            standard Unix tools; for example, files in an uncompressed archive
            can be compressed with the <application>gzip</application> or
-           <application>lz4</application>tool.
+           <application>lz4</application> tools.
            This format is compressed by default using <literal>gzip</literal>
            and also supports parallel dumps.
           </para>
@@ -655,7 +655,7 @@ PostgreSQL documentation
        <para>
         Specify the compression method and/or the compression level to use.
         The compression method can be set to <literal>gzip</literal> or
-        <literal>lz4</literal> or <literal>none</literal> for no compression.
+        <literal>lz4</literal>, or <literal>none</literal> for no compression.
         A compression detail string can optionally be specified.  If the
         detail string is an integer, it specifies the compression level.
         Otherwise, it should be a comma-separated list of items, each of the
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 72b19ee6cde..ad7bc5c194b 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -4248,10 +4248,10 @@ foreach my $run (sort keys %pgdump_runs)
 	my $test_key = $run;
 	my $run_db   = 'postgres';
 
-	# Skip command-level tests for gzip if there is no support for it.
+	# Skip command-level tests for gzip/lz4 if they're not supported.
 	if ($pgdump_runs{$run}->{compile_option} &&
-		($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
-		($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4))
+		(($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
+		($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4)))
 	{
 		note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
 		next;
-- 
2.34.1

>From 083f1092aaf6e32a99649f02ad986df66a1d0d82 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sun, 26 Feb 2023 11:32:34 -0600
Subject: [PATCH 3/4] pg_dump/lz4: use lz4 streaming compression..

Since 0da243fed, each row was independently compressed, but that gives
poor compression (especially for tables with few columns) due to not
sharing compression state.

Also, respect the user's requested compression level.
---
 src/bin/pg_dump/compress_lz4.c | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index 7dacfeae469..32a8f668907 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -40,6 +40,7 @@ typedef struct LZ4CompressorState
 {
 	char	   *outbuf;
 	size_t		outsize;
+	LZ4_stream_t *stream;
 } LZ4CompressorState;
 
 /* Private routines that support LZ4 compressed data I/O */
@@ -90,8 +91,12 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
 		LZ4cs->outsize = requiredsize;
 	}
 
-	compressed = LZ4_compress_default(data, LZ4cs->outbuf,
-									  dLen, LZ4cs->outsize);
+	if (LZ4cs->stream == NULL)
+		LZ4cs->stream = LZ4_createStream();
+
+	compressed = LZ4_compress_fast_continue(LZ4cs->stream, data, LZ4cs->outbuf,
+			dLen, LZ4cs->outsize,
+			AH->compression_spec.level);
 
 	if (compressed <= 0)
 		pg_fatal("failed to LZ4 compress data");
@@ -105,6 +110,10 @@ EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
 	LZ4CompressorState *LZ4cs;
 
 	LZ4cs = (LZ4CompressorState *) cs->private_data;
+
+	if (LZ4cs->stream != NULL)
+		LZ4_freeStream(LZ4cs->stream);
+
 	pg_free(LZ4cs->outbuf);
 	pg_free(LZ4cs);
 }
-- 
2.34.1

>From 9c4fc70af1bba238b639639a55df8f26d44e36fe Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sun, 26 Feb 2023 11:41:47 -0600
Subject: [PATCH 4/4] WIP: +change to use LZ4 frame API

This uses buffering to avoid flushing and writing a block header for
each row, for an additional improvement in compressed size.

XXX: update archive version since this changes the meaning of "lz4" in
the header
---
 src/bin/pg_dump/compress_lz4.c | 102 +++++++++++++++++++++++++--------
 1 file changed, 79 insertions(+), 23 deletions(-)

diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index 32a8f668907..88a819c553b 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -40,7 +40,7 @@ typedef struct LZ4CompressorState
 {
 	char	   *outbuf;
 	size_t		outsize;
-	LZ4_stream_t *stream;
+	LZ4F_compressionContext_t ctx;
 } LZ4CompressorState;
 
 /* Private routines that support LZ4 compressed data I/O */
@@ -52,29 +52,59 @@ static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs);
 static void
 ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
 {
-	LZ4_streamDecode_t lz4StreamDecode;
+	size_t		res;
 	char	   *buf;
 	char	   *decbuf;
 	size_t		buflen;
+	size_t		decbuflen;
 	size_t		cnt;
 
+	LZ4F_decompressOptions_t opts = {.stableDst = 0}; /* not stable */
+	LZ4F_decompressionContext_t dtx;
+
+	res = LZ4F_createDecompressionContext(&dtx, LZ4F_VERSION);
+	if (LZ4F_isError(res))
+		pg_fatal("failed to LZ4F_createDecompressionContext: %s",
+				LZ4F_getErrorName(res));
+
 	buflen = LZ4_IN_SIZE;
 	buf = pg_malloc(buflen);
-	decbuf = pg_malloc(buflen);
 
-	LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0);
+	decbuflen = LZ4_IN_SIZE;
+	decbuf = pg_malloc(LZ4_IN_SIZE);
 
 	while ((cnt = cs->readF(AH, &buf, &buflen)))
 	{
-		int			decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode,
-															buf, decbuf,
-															cnt, buflen);
+		char	*end = buf + cnt;
 
-		ahwrite(decbuf, 1, decBytes, AH);
+		for (char *ptr = buf; ptr != end;)
+		{
+			size_t			decBytes = decbuflen;
+			size_t			srcBytes = cnt;
+
+			res = LZ4F_decompress(dtx,
+					decbuf, &decBytes,
+					ptr, &srcBytes, &opts);
+			if (LZ4F_isError(res))
+				pg_fatal("failed to LZ4F_decompress: %s", LZ4F_getErrorName(res));
+
+			ptr += srcBytes;
+			cnt -= srcBytes;
+			if (decBytes > 0)
+				ahwrite(decbuf, 1, decBytes, AH);
+
+			if (decbuflen < res)
+			{
+				/* resize the buffer to the expected size */
+				decbuf = pg_realloc(decbuf, res);
+				decbuflen = res;
+			}
+		}
 	}
 
 	pg_free(buf);
 	pg_free(decbuf);
+	LZ4F_freeDecompressionContext(dtx);
 }
 
 static void
@@ -82,40 +112,66 @@ WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
 					  const void *data, size_t dLen)
 {
 	LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private_data;
-	size_t		compressed;
-	size_t		requiredsize = LZ4_compressBound(dLen);
+	size_t		requiredsize = LZ4F_compressBound(dLen, NULL);
+	size_t		res;
 
+	char *oldout = LZ4cs->outbuf;
 	if (requiredsize > LZ4cs->outsize)
 	{
 		LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize);
 		LZ4cs->outsize = requiredsize;
 	}
 
-	if (LZ4cs->stream == NULL)
-		LZ4cs->stream = LZ4_createStream();
-
-	compressed = LZ4_compress_fast_continue(LZ4cs->stream, data, LZ4cs->outbuf,
-			dLen, LZ4cs->outsize,
-			AH->compression_spec.level);
+	if (oldout == NULL)
+	{
+		LZ4F_preferences_t prefs = {
+			.compressionLevel = cs->compression_spec.level
+		};
+
+		res = LZ4F_createCompressionContext(&LZ4cs->ctx, LZ4F_VERSION);
+		if (LZ4F_isError(res))
+			pg_fatal("failed to LZ4F_createCompressionContext: %s", LZ4F_getErrorName(res));
+
+		res = LZ4F_compressBegin(LZ4cs->ctx, LZ4cs->outbuf, LZ4cs->outsize, &prefs);
+		if (LZ4F_isError(res))
+			pg_fatal("failed to LZ4F_compressBegin: %s", LZ4F_getErrorName(res));
+		cs->writeF(AH, LZ4cs->outbuf, res);
+	}
 
-	if (compressed <= 0)
-		pg_fatal("failed to LZ4 compress data");
+	res = LZ4F_compressUpdate(LZ4cs->ctx, LZ4cs->outbuf, LZ4cs->outsize,
+								 data, dLen, NULL);
+	if (LZ4F_isError(res))
+		pg_fatal("failed to LZ4F_compressUpdate: %s", LZ4F_getErrorName(res));
 
-	cs->writeF(AH, LZ4cs->outbuf, compressed);
+	if (res > 0)
+		cs->writeF(AH, LZ4cs->outbuf, res);
 }
 
 static void
 EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
 {
 	LZ4CompressorState *LZ4cs;
+	size_t	res;
 
 	LZ4cs = (LZ4CompressorState *) cs->private_data;
 
-	if (LZ4cs->stream != NULL)
-		LZ4_freeStream(LZ4cs->stream);
+	if (LZ4cs->outbuf != NULL)
+	{
+		res = LZ4F_compressEnd(LZ4cs->ctx, LZ4cs->outbuf, LZ4cs->outsize, NULL);
+		if (LZ4F_isError(res))
+			pg_fatal("failed to LZ4F_compressEnd: %s", LZ4F_getErrorName(res));
+
+		if (res > 0)
+			cs->writeF(AH, LZ4cs->outbuf, res);
 
-	pg_free(LZ4cs->outbuf);
-	pg_free(LZ4cs);
+		LZ4F_freeCompressionContext(LZ4cs->ctx);
+		if (LZ4F_isError(res))
+			pg_fatal("failed to LZ4F_freeCompressionContext: %s", LZ4F_getErrorName(res));
+
+		pg_free(LZ4cs->outbuf);
+		LZ4cs->outbuf = NULL;
+		pg_free(LZ4cs);
+	}
 }
 
 
-- 
2.34.1

Reply via email to