On Fri, Mar 10, 2023 at 12:48:13PM -0800, Jacob Champion wrote:
> On Wed, Mar 8, 2023 at 10:59 AM Jacob Champion <[email protected]>
> wrote:
> > I did some smoke testing against zstd's GitHub release on Windows. To
> > build against it, I had to construct an import library, and put that
> > and the DLL into the `lib` folder expected by the MSVC scripts...
> > which makes me wonder if I've chosen a harder way than necessary?
>
> It looks like pg_dump's meson.build is missing dependencies on zstd
> (meson couldn't find the headers in the subproject without them).
I saw that this was added for LZ4, but I hadn't added it for zstd since
I didn't run into an issue without it. Could you check that what I've
added works for your case ?
> > Parallel zstd dumps seem to work as expected, in that the resulting
> > pg_restore output is identical to uncompressed dumps and nothing
> > explodes. I haven't inspected the threading implementation for safety
> > yet, as you mentioned.
>
> Hm. Best I can tell, the CloneArchive() machinery is supposed to be
> handling safety for this, by isolating each thread's state. I don't feel
> comfortable pronouncing this new addition safe or not, because I'm not
> sure I understand what the comments in the format-specific _Clone()
> callbacks are saying yet.
My line of reasoning for unix is that pg_dump forks before any calls to
zstd. Nothing zstd does ought to affect the pg_dump layer. But that
doesn't apply to pg_dump under windows. This is an opened question. If
there's no solid answer, I could disable/ignore the option (maybe only
under windows).
> On to code (not a complete review):
>
> > if (hasSuffix(fname, ".gz"))
> > compression_spec.algorithm = PG_COMPRESSION_GZIP;
> > else
> > {
> > bool exists;
> >
> > exists = (stat(path, &st) == 0);
> > /* avoid unused warning if it is not built with compression */
> > if (exists)
> > compression_spec.algorithm = PG_COMPRESSION_NONE;
> > -#ifdef HAVE_LIBZ
> > - if (!exists)
> > - {
> > - free_keep_errno(fname);
> > - fname = psprintf("%s.gz", path);
> > - exists = (stat(fname, &st) == 0);
> > -
> > - if (exists)
> > - compression_spec.algorithm = PG_COMPRESSION_GZIP;
> > - }
> > -#endif
> > -#ifdef USE_LZ4
> > - if (!exists)
> > - {
> > - free_keep_errno(fname);
> > - fname = psprintf("%s.lz4", path);
> > - exists = (stat(fname, &st) == 0);
> > -
> > - if (exists)
> > - compression_spec.algorithm = PG_COMPRESSION_LZ4;
> > - }
> > -#endif
> > + else if (check_compressed_file(path, &fname, "gz"))
> > + compression_spec.algorithm = PG_COMPRESSION_GZIP;
> > + else if (check_compressed_file(path, &fname, "lz4"))
> > + compression_spec.algorithm = PG_COMPRESSION_LZ4;
> > + else if (check_compressed_file(path, &fname, "zst"))
> > + compression_spec.algorithm = PG_COMPRESSION_ZSTD;
> > }
>
> This function lost some coherence, I think. Should there be a hasSuffix
> check at the top for ".zstd" (and, for that matter, ".lz4")?
The function is first checking if it was passed a filename which already
has a suffix. And if not, it searches through a list of suffixes,
testing for an existing file with each suffix. The search with stat()
doesn't happen if it has a suffix. I'm having trouble seeing how the
hasSuffix() branch isn't dead code. Another opened question.
> I'm a little suspicious of the replacement of supports_compression()
> with parse_compress_specification(). For example:
>
> > - errmsg = supports_compression(AH->compression_spec);
> > - if (errmsg)
> > + parse_compress_specification(AH->compression_spec.algorithm,
> > + NULL, &compress_spec);
> > + if (compress_spec.parse_error != NULL)
> > {
> > pg_log_warning("archive is compressed, but this installation does
> > not support compression (%s
> > - errmsg);
> > - pg_free(errmsg);
> > + compress_spec.parse_error);
> > + pg_free(compress_spec.parse_error);
> > }
>
> The top-level error here is "does not support compression", but wouldn't
> a bad specification option with a supported compression method trip this
> path too?
No - since the 2nd argument is passed as NULL, it just checks whether
the compression is supported. Maybe there ought to be a more
direct/clean way to do it. But up to now evidently nobody needed to do
that.
> > +static void
> > +ZSTD_CCtx_setParam_or_die(ZSTD_CStream *cstream,
> > + ZSTD_cParameter param, int value, char *paramname)
>
> IMO we should avoid stepping on the ZSTD_ namespace with our own
> internal function names.
done
> > + if (cs->readF != NULL)
> > +
> > + if (cs->writeF != NULL)
>
> This seems to suggest that both cs->readF and cs->writeF could be set,
> but in that case, the output buffer gets reallocated.
I put back an assertion that exactly one of them was set, since that's
true of how it currently works.
> I was curious about the extra byte allocated in the decompression case.
> I see that ReadDataFromArchiveZstd() is null-terminating the buffer
> before handing it to ahwrite(), but why does it need to do that?
I was trying to figure that out, too. I think the unterminated case
might be for ExecuteSqlCommandBuf(), and that may only (have) been
needed to allow pg_restore to handle ancient/development versions of
pg_dump... It's not currently hit.
https://coverage.postgresql.org/src/bin/pg_dump/pg_backup_db.c.gcov.html#470
I found that the terminator was added for the uncompressed case was
added at e8f69be05 and removed in bf9aa490d.
> > +Zstd_get_error(CompressFileHandle *CFH)
>
> Seems like this should be using the zstderror stored in the handle?
Yes - I'd already addressed that locally.
> In ReadDataFromArchiveZstd():
>
> > + * Read compressed data. Note that readF can resize the buffer; the
> > + * new size is tracked and used for future loops.
> This is pretty complex for what it's doing. I'm a little worried that we
> let the reallocated buffer escape to the caller while losing track of
> how big it is. I think that works today, since there's only ever one
> call per handle, but any future refactoring that allowed cs->readData()
> to be called more than once would subtly break this code.
Note that nothing bad happens if we lose track of how big it is (well,
assuming that readF doesn't *shrink* the buffer).
The previous patch version didn't keep track of its new size, and the only
consequence is that readF() might re-resize it again on a future iteration,
even if it was already sufficiently large.
When I originally wrote it (and up until that patch version), I left
this as an XXX comment about reusing the resized buffer. But it seemed
easy enough to fix so I did.
> In ZstdWriteCommon():
>
> Elsewhere, output->pos is set to zero before compressing, but here we do
> it after, which I think leads to subtle differences in the function
> preconditions. If that's an intentional difference, can the reason be
> called out in a comment?
It's not deliberate. I think it had no effect, but changed - thanks.
--
Justin
>From 605bfb6974503bb71bdc09c9539313ef92d19ff3 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <[email protected]>
Date: Sat, 7 Jan 2023 15:45:06 -0600
Subject: [PATCH 1/3] pg_dump: zstd compression
Previously proposed at: [email protected]
---
doc/src/sgml/ref/pg_dump.sgml | 8 +-
src/bin/pg_dump/Makefile | 2 +
src/bin/pg_dump/compress_io.c | 85 ++---
src/bin/pg_dump/compress_zstd.c | 523 ++++++++++++++++++++++++++
src/bin/pg_dump/compress_zstd.h | 25 ++
src/bin/pg_dump/meson.build | 10 +-
src/bin/pg_dump/pg_backup_archiver.c | 28 +-
src/bin/pg_dump/pg_backup_directory.c | 2 +
src/bin/pg_dump/pg_dump.c | 13 -
src/bin/pg_dump/t/002_pg_dump.pl | 79 +++-
src/tools/pginclude/cpluspluscheck | 1 +
11 files changed, 683 insertions(+), 93 deletions(-)
create mode 100644 src/bin/pg_dump/compress_zstd.c
create mode 100644 src/bin/pg_dump/compress_zstd.h
diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index e6b003bf104..4652087144f 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -330,8 +330,9 @@ PostgreSQL documentation
machine-readable format that <application>pg_restore</application>
can read. A directory format archive can be manipulated with
standard Unix tools; for example, files in an uncompressed archive
- can be compressed with the <application>gzip</application> or
- <application>lz4</application> tools.
+ can be compressed with the <application>gzip</application>,
+ <application>lz4</application>, or
+ <application>zstd</application> tools.
This format is compressed by default using <literal>gzip</literal>
and also supports parallel dumps.
</para>
@@ -655,7 +656,8 @@ PostgreSQL documentation
<para>
Specify the compression method and/or the compression level to use.
The compression method can be set to <literal>gzip</literal>,
- <literal>lz4</literal>, or <literal>none</literal> for no compression.
+ <literal>lz4</literal>, <literal>zstd</literal>,
+ or <literal>none</literal> for no compression.
A compression detail string can optionally be specified. If the
detail string is an integer, it specifies the compression level.
Otherwise, it should be a comma-separated list of items, each of the
diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index eb8f59459a1..24de7593a6a 100644
--- a/src/bin/pg_dump/Makefile
+++ b/src/bin/pg_dump/Makefile
@@ -18,6 +18,7 @@ include $(top_builddir)/src/Makefile.global
export GZIP_PROGRAM=$(GZIP)
export LZ4
+export ZSTD
export with_icu
override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
@@ -29,6 +30,7 @@ OBJS = \
compress_io.o \
compress_lz4.o \
compress_none.o \
+ compress_zstd.o \
dumputils.o \
parallel.o \
pg_backup_archiver.o \
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index ce06f1eac9c..21972933d63 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -52,8 +52,8 @@
*
* InitDiscoverCompressFileHandle tries to infer the compression by the
* filename suffix. If the suffix is not yet known then it tries to simply
- * open the file and if it fails, it tries to open the same file with the .gz
- * suffix, and then again with the .lz4 suffix.
+ * open the file and if it fails, it tries to open the same file with
+ * compressed suffixes.
*
* IDENTIFICATION
* src/bin/pg_dump/compress_io.c
@@ -69,6 +69,7 @@
#include "compress_io.h"
#include "compress_lz4.h"
#include "compress_none.h"
+#include "compress_zstd.h"
#include "pg_backup_utils.h"
/*----------------------
@@ -76,36 +77,6 @@
*----------------------
*/
-/*
- * Checks whether a compression algorithm is supported.
- *
- * On success returns NULL, otherwise returns a malloc'ed string which can be
- * used by the caller in an error message.
- */
-char *
-supports_compression(const pg_compress_specification compression_spec)
-{
- const pg_compress_algorithm algorithm = compression_spec.algorithm;
- bool supported = false;
-
- if (algorithm == PG_COMPRESSION_NONE)
- supported = true;
-#ifdef HAVE_LIBZ
- if (algorithm == PG_COMPRESSION_GZIP)
- supported = true;
-#endif
-#ifdef USE_LZ4
- if (algorithm == PG_COMPRESSION_LZ4)
- supported = true;
-#endif
-
- if (!supported)
- return psprintf("this build does not support compression with %s",
- get_compress_algorithm_name(algorithm));
-
- return NULL;
-}
-
/*----------------------
* Compressor API
*----------------------
@@ -130,6 +101,8 @@ AllocateCompressor(const pg_compress_specification compression_spec,
InitCompressorGzip(cs, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
InitCompressorLZ4(cs, compression_spec);
+ else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+ InitCompressorZstd(cs, compression_spec);
return cs;
}
@@ -196,20 +169,30 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
InitCompressFileHandleGzip(CFH, compression_spec);
else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
InitCompressFileHandleLZ4(CFH, compression_spec);
+ else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+ InitCompressFileHandleZstd(CFH, compression_spec);
return CFH;
}
+static bool
+check_compressed_file(const char *path, char **fname, char *ext)
+{
+ free_keep_errno(*fname);
+ *fname = psprintf("%s.%s", path, ext);
+ return (access(*fname, F_OK) == 0);
+}
+
/*
* Open a file for reading. 'path' is the file to open, and 'mode' should
* be either "r" or "rb".
*
* If the file at 'path' contains the suffix of a supported compression method,
- * currently this includes ".gz" and ".lz4", then this compression will be used
+ * currently this includes ".gz", ".lz4" and ".zst", then this compression will be used
* throughout. Otherwise the compression will be inferred by iteratively trying
* to open the file at 'path', first as is, then by appending known compression
* suffixes. So if you pass "foo" as 'path', this will open either "foo" or
- * "foo.gz" or "foo.lz4", trying in that order.
+ * "foo.{gz,lz4,zst}", trying in that order.
*
* On failure, return NULL with an error code in errno.
*/
@@ -231,34 +214,14 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
compression_spec.algorithm = PG_COMPRESSION_GZIP;
else
{
- bool exists;
-
- exists = (stat(path, &st) == 0);
- /* avoid unused warning if it is not built with compression */
- if (exists)
+ if (stat(path, &st) == 0)
compression_spec.algorithm = PG_COMPRESSION_NONE;
-#ifdef HAVE_LIBZ
- if (!exists)
- {
- free_keep_errno(fname);
- fname = psprintf("%s.gz", path);
- exists = (stat(fname, &st) == 0);
-
- if (exists)
- compression_spec.algorithm = PG_COMPRESSION_GZIP;
- }
-#endif
-#ifdef USE_LZ4
- if (!exists)
- {
- free_keep_errno(fname);
- fname = psprintf("%s.lz4", path);
- exists = (stat(fname, &st) == 0);
-
- if (exists)
- compression_spec.algorithm = PG_COMPRESSION_LZ4;
- }
-#endif
+ else if (check_compressed_file(path, &fname, "gz"))
+ compression_spec.algorithm = PG_COMPRESSION_GZIP;
+ else if (check_compressed_file(path, &fname, "lz4"))
+ compression_spec.algorithm = PG_COMPRESSION_LZ4;
+ else if (check_compressed_file(path, &fname, "zst"))
+ compression_spec.algorithm = PG_COMPRESSION_ZSTD;
}
CFH = InitCompressFileHandle(compression_spec);
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
new file mode 100644
index 00000000000..dc610a64de5
--- /dev/null
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -0,0 +1,523 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_zstd.c
+ * Routines for archivers to write a Zstd compressed data stream.
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/bin/pg_dump/compress_zstd.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "pg_backup_utils.h"
+#include "compress_zstd.h"
+
+#ifndef USE_ZSTD
+
+void
+InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
+{
+ pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+{
+ pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+#else
+
+#include <zstd.h>
+
+typedef struct ZstdCompressorState
+{
+ /* This is a normal file to which we read/write compressed data */
+ FILE *fp;
+
+ ZSTD_CStream *cstream;
+ ZSTD_DStream *dstream;
+ ZSTD_outBuffer output;
+ ZSTD_inBuffer input;
+
+ /* pointer to a static string like from strerror(), for Zstd_write() */
+ const char *zstderror;
+} ZstdCompressorState;
+
+static ZSTD_CStream *ZstdCStreamParams(pg_compress_specification compress);
+static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+ const void *data, size_t dLen);
+static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
+
+static void
+Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
+ ZSTD_cParameter param, int value, char *paramname)
+{
+ size_t res;
+
+ res = ZSTD_CCtx_setParameter(cstream, param, value);
+ if (ZSTD_isError(res))
+ pg_fatal("could not set compression parameter: \"%s\": %s",
+ paramname, ZSTD_getErrorName(res));
+}
+
+/* Return a compression stream with parameters set per argument */
+static ZSTD_CStream *
+ZstdCStreamParams(pg_compress_specification compress)
+{
+ ZSTD_CStream *cstream;
+
+ cstream = ZSTD_createCStream();
+ if (cstream == NULL)
+ pg_fatal("could not initialize compression library");
+
+ Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
+ compress.level, "level");
+
+ if (compress.options & PG_COMPRESSION_OPTION_WORKERS)
+ Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers,
+ compress.workers, "workers");
+
+ return cstream;
+}
+
+/* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
+static void
+ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+ ZSTD_inBuffer *input = &zstdcs->input;
+ ZSTD_outBuffer *output = &zstdcs->output;
+
+ /* Loop while there's any input or until flushed */
+ while (input->pos != input->size || flush)
+ {
+ size_t res;
+
+ output->pos = 0;
+ res = ZSTD_compressStream2(zstdcs->cstream, output,
+ input, flush ? ZSTD_e_end : ZSTD_e_continue);
+
+ if (ZSTD_isError(res))
+ pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
+
+ /*
+ * Extra paranoia: avoid zero-length chunks, since a zero length chunk
+ * is the EOF marker in the custom format. This should never happen
+ * but...
+ */
+ if (output->pos > 0)
+ cs->writeF(AH, output->dst, output->pos);
+
+ if (res == 0)
+ break;
+ }
+}
+
+void
+EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+
+ /* We expect that exactly one of readF/writeF is specified */
+ Assert((cs->readF == NULL) != (cs->writeF == NULL));
+
+ if (cs->readF != NULL)
+ {
+ Assert(zstdcs->cstream == NULL);
+ ZSTD_freeDStream(zstdcs->dstream);
+ pg_free(unconstify(void *, zstdcs->input.src));
+ }
+ else if (cs->writeF != NULL)
+ {
+ Assert(zstdcs->dstream == NULL);
+ ZstdWriteCommon(AH, cs, true);
+ ZSTD_freeCStream(zstdcs->cstream);
+ pg_free(zstdcs->output.dst);
+ }
+
+ pg_free(zstdcs);
+}
+
+static void
+WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+ const void *data, size_t dLen)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+
+ zstdcs->input.src = data;
+ zstdcs->input.size = dLen;
+ zstdcs->input.pos = 0;
+
+ ZstdWriteCommon(AH, cs, false);
+}
+
+static void
+ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+ ZSTD_outBuffer *output = &zstdcs->output;
+ ZSTD_inBuffer *input = &zstdcs->input;
+ size_t input_allocated_size = ZSTD_DStreamInSize();
+ size_t res;
+
+ for (;;)
+ {
+ size_t cnt;
+
+ /*
+ * Read compressed data. Note that readF can resize the buffer; the
+ * new size is tracked and used for future loops.
+ */
+ input->size = input_allocated_size;
+ cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
+ input_allocated_size = input->size;
+ input->size = cnt;
+ input->pos = 0;
+
+ if (cnt == 0)
+ break;
+
+ /* Now decompress */
+ while (input->pos < input->size)
+ {
+ output->pos = 0;
+ res = ZSTD_decompressStream(zstdcs->dstream, output, input);
+ if (ZSTD_isError(res))
+ pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+ /* then write the decompressed data to the output handle */
+ ((char *) output->dst)[output->pos] = '\0';
+ ahwrite(output->dst, 1, output->pos, AH);
+
+ if (res == 0)
+ break; /* End of frame */
+ }
+ }
+}
+
+/* Public routine that supports Zstd compressed data I/O */
+void
+InitCompressorZstd(CompressorState *cs,
+ const pg_compress_specification compression_spec)
+{
+ ZstdCompressorState *zstdcs;
+
+ cs->readData = ReadDataFromArchiveZstd;
+ cs->writeData = WriteDataToArchiveZstd;
+ cs->end = EndCompressorZstd;
+
+ cs->compression_spec = compression_spec;
+
+ zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
+ cs->private_data = zstdcs;
+
+ /* We expect that exactly one of readF/writeF is specified */
+ Assert((cs->readF == NULL) != (cs->writeF == NULL));
+
+ if (cs->readF != NULL)
+ {
+ zstdcs->dstream = ZSTD_createDStream();
+ if (zstdcs->dstream == NULL)
+ pg_fatal("could not initialize compression library");
+
+ zstdcs->input.size = ZSTD_DStreamInSize();
+ zstdcs->input.src = pg_malloc(zstdcs->input.size);
+
+ zstdcs->output.size = ZSTD_DStreamOutSize();
+ zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
+ }
+ else if (cs->writeF != NULL)
+ {
+ zstdcs->cstream = ZstdCStreamParams(cs->compression_spec);
+
+ zstdcs->output.size = ZSTD_CStreamOutSize();
+ zstdcs->output.dst = pg_malloc(zstdcs->output.size);
+ zstdcs->output.pos = 0;
+ }
+}
+
+/*
+ * Compressed stream API
+ */
+
+static size_t
+Zstd_read(void *ptr, size_t size, CompressFileHandle *CFH)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+ ZSTD_inBuffer *input = &zstdcs->input;
+ ZSTD_outBuffer *output = &zstdcs->output;
+ size_t input_allocated_size = ZSTD_DStreamInSize();
+ size_t res,
+ cnt;
+
+ output->size = size;
+ output->dst = ptr;
+ output->pos = 0;
+
+ for (;;)
+ {
+ Assert(input->pos <= input->size);
+ Assert(input->size <= input_allocated_size);
+
+ /* If the input is completely consumed, start back at the beginning */
+ if (input->pos == input->size)
+ {
+ /* input->size is size produced by "fread" */
+ input->size = 0;
+ /* input->pos is position consumed by decompress */
+ input->pos = 0;
+ }
+
+ /* read compressed data if we must produce more input */
+ if (input->pos == input->size)
+ {
+ cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
+ input->size = cnt;
+
+ Assert(cnt >= 0);
+ Assert(cnt <= input_allocated_size);
+
+ /* If we have no more input to consume, we're done */
+ if (cnt == 0)
+ break;
+ }
+
+ while (input->pos < input->size)
+ {
+ /* now decompress */
+ res = ZSTD_decompressStream(zstdcs->dstream, output, input);
+
+ if (ZSTD_isError(res))
+ pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+ if (output->pos == output->size)
+ break; /* No more room for output */
+
+ if (res == 0)
+ break; /* End of frame */
+ }
+
+ if (output->pos == output->size)
+ break; /* We read all the data that fits */
+ }
+
+ return output->pos;
+}
+
+static size_t
+Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+ ZSTD_inBuffer *input = &zstdcs->input;
+ ZSTD_outBuffer *output = &zstdcs->output;
+ size_t res,
+ cnt;
+
+ input->src = ptr;
+ input->size = size;
+ input->pos = 0;
+
+ /* Consume all input, to be flushed later */
+ while (input->pos != input->size)
+ {
+ output->pos = 0;
+ res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
+ if (ZSTD_isError(res))
+ {
+ zstdcs->zstderror = ZSTD_getErrorName(res);
+ return -1;
+ }
+
+ cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+ if (cnt != output->pos)
+ {
+ zstdcs->zstderror = strerror(errno);
+ return -1;
+ }
+ }
+
+ return size;
+}
+
+static int
+Zstd_getc(CompressFileHandle *CFH)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+ int ret;
+
+ if (CFH->read_func(&ret, 1, CFH) != 1)
+ {
+ if (feof(zstdcs->fp))
+ pg_fatal("could not read from input file: end of file");
+ else
+ pg_fatal("could not read from input file: %m");
+ }
+ return ret;
+}
+
+static char *
+Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
+{
+ int i,
+ res;
+
+ /*
+ * Read one byte at a time until newline or EOF. This is only used to read
+ * the list of LOs, and the I/O is buffered anyway.
+ */
+ for (i = 0; i < len - 1; ++i)
+ {
+ res = CFH->read_func(&buf[i], 1, CFH);
+ if (res != 1)
+ break;
+ if (buf[i] == '\n')
+ {
+ ++i;
+ break;
+ }
+ }
+ buf[i] = '\0';
+ return i > 0 ? buf : NULL;
+}
+
+static int
+Zstd_close(CompressFileHandle *CFH)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+ int result;
+
+ if (zstdcs->cstream)
+ {
+ size_t res,
+ cnt;
+ ZSTD_inBuffer *input = &zstdcs->input;
+ ZSTD_outBuffer *output = &zstdcs->output;
+
+ /* Loop until the compression buffers are fully consumed */
+ for (;;)
+ {
+ output->pos = 0;
+ res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
+ if (ZSTD_isError(res))
+ {
+ zstdcs->zstderror = ZSTD_getErrorName(res);
+ return -1;
+ }
+
+ cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+ if (cnt != output->pos)
+ {
+ zstdcs->zstderror = strerror(errno);
+ return -1;
+ }
+
+ if (res == 0)
+ break; /* End of frame */
+ }
+
+ ZSTD_freeCStream(zstdcs->cstream);
+ pg_free(zstdcs->output.dst);
+ }
+
+ if (zstdcs->dstream)
+ {
+ ZSTD_freeDStream(zstdcs->dstream);
+ pg_free(unconstify(void *, zstdcs->input.src));
+ }
+
+ result = fclose(zstdcs->fp);
+ pg_free(zstdcs);
+ return result;
+}
+
+static int
+Zstd_eof(CompressFileHandle *CFH)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+ return feof(zstdcs->fp);
+}
+
+static int
+Zstd_open(const char *path, int fd, const char *mode,
+ CompressFileHandle *CFH)
+{
+ FILE *fp;
+ ZstdCompressorState *zstdcs;
+
+ if (fd >= 0)
+ fp = fdopen(fd, mode);
+ else
+ fp = fopen(path, mode);
+
+ if (fp == NULL)
+ return 1;
+
+ zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
+ CFH->private_data = zstdcs;
+ zstdcs->fp = fp;
+
+ if (mode[0] == 'r')
+ {
+ zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
+ zstdcs->dstream = ZSTD_createDStream();
+ if (zstdcs->dstream == NULL)
+ pg_fatal("could not initialize compression library");
+ }
+ else if (mode[0] == 'w' || mode[0] == 'a')
+ {
+ zstdcs->output.size = ZSTD_CStreamOutSize();
+ zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
+ zstdcs->cstream = ZstdCStreamParams(CFH->compression_spec);
+ if (zstdcs->cstream == NULL)
+ pg_fatal("could not initialize compression library");
+ }
+ else
+ pg_fatal("unhandled mode");
+
+ return 0;
+}
+
+static int
+Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
+{
+ char fname[MAXPGPATH];
+
+ sprintf(fname, "%s.zst", path);
+ return CFH->open_func(fname, -1, mode, CFH);
+}
+
+static const char *
+Zstd_get_error(CompressFileHandle *CFH)
+{
+ ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+ return zstdcs->zstderror;
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH,
+ const pg_compress_specification compression_spec)
+{
+ CFH->open_func = Zstd_open;
+ CFH->open_write_func = Zstd_open_write;
+ CFH->read_func = Zstd_read;
+ CFH->write_func = Zstd_write;
+ CFH->gets_func = Zstd_gets;
+ CFH->getc_func = Zstd_getc;
+ CFH->close_func = Zstd_close;
+ CFH->eof_func = Zstd_eof;
+ CFH->get_error_func = Zstd_get_error;
+
+ CFH->compression_spec = compression_spec;
+
+ CFH->private_data = NULL;
+}
+
+#endif /* USE_ZSTD */
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
new file mode 100644
index 00000000000..2aaa6b100b1
--- /dev/null
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_zstd.h
+ * Zstd interface to compress_io.c routines
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/bin/pg_dump/compress_zstd.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef COMPRESS_ZSTD_H
+#define COMPRESS_ZSTD_H
+
+#include "compress_io.h"
+
+extern void InitCompressorZstd(CompressorState *cs,
+ const pg_compress_specification compression_spec);
+extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
+ const pg_compress_specification compression_spec);
+
+#endif /* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build
index ab4c25c7811..2c5006c61f7 100644
--- a/src/bin/pg_dump/meson.build
+++ b/src/bin/pg_dump/meson.build
@@ -5,6 +5,7 @@ pg_dump_common_sources = files(
'compress_io.c',
'compress_lz4.c',
'compress_none.c',
+ 'compress_zstd.c',
'dumputils.c',
'parallel.c',
'pg_backup_archiver.c',
@@ -19,7 +20,7 @@ pg_dump_common_sources = files(
pg_dump_common = static_library('libpgdump_common',
pg_dump_common_sources,
c_pch: pch_postgres_fe_h,
- dependencies: [frontend_code, libpq, lz4, zlib],
+ dependencies: [frontend_code, libpq, lz4, zlib, zstd],
kwargs: internal_lib_args,
)
@@ -39,7 +40,7 @@ endif
pg_dump = executable('pg_dump',
pg_dump_sources,
link_with: [pg_dump_common],
- dependencies: [frontend_code, libpq, zlib],
+ dependencies: [frontend_code, libpq, zlib, zstd],
kwargs: default_bin_args,
)
bin_targets += pg_dump
@@ -58,7 +59,7 @@ endif
pg_dumpall = executable('pg_dumpall',
pg_dumpall_sources,
link_with: [pg_dump_common],
- dependencies: [frontend_code, libpq, zlib],
+ dependencies: [frontend_code, libpq, zlib, zstd],
kwargs: default_bin_args,
)
bin_targets += pg_dumpall
@@ -77,7 +78,7 @@ endif
pg_restore = executable('pg_restore',
pg_restore_sources,
link_with: [pg_dump_common],
- dependencies: [frontend_code, libpq, zlib],
+ dependencies: [frontend_code, libpq, zlib, zstd],
kwargs: default_bin_args,
)
bin_targets += pg_restore
@@ -90,6 +91,7 @@ tests += {
'env': {
'GZIP_PROGRAM': gzip.path(),
'LZ4': program_lz4.found() ? program_lz4.path() : '',
+ 'ZSTD': program_zstd.found() ? program_zstd.path() : '',
'with_icu': icu.found() ? 'yes' : 'no',
},
'tests': [
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 61ebb8fe85d..6fda84d8774 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -388,10 +388,12 @@ RestoreArchive(Archive *AHX)
{
if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
{
- char *errmsg = supports_compression(AH->compression_spec);
- if (errmsg)
+ pg_compress_specification compress_spec;
+ parse_compress_specification(AH->compression_spec.algorithm,
+ NULL, &compress_spec);
+ if (compress_spec.parse_error != NULL)
pg_fatal("cannot restore from compressed archive (%s)",
- errmsg);
+ compress_spec.parse_error);
else
break;
}
@@ -2075,7 +2077,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
/*
* Check if the specified archive is a directory. If so, check if
- * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
+ * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
*/
if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
{
@@ -2086,10 +2088,17 @@ _discoverArchiveFormat(ArchiveHandle *AH)
if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
return AH->format;
#endif
+
#ifdef USE_LZ4
if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
return AH->format;
#endif
+
+#ifdef USE_ZSTD
+ if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
+ return AH->format;
+#endif
+
pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
AH->fSpec);
fh = NULL; /* keep compiler quiet */
@@ -3674,7 +3683,7 @@ WriteHead(ArchiveHandle *AH)
void
ReadHead(ArchiveHandle *AH)
{
- char *errmsg;
+ pg_compress_specification compress_spec;
char vmaj,
vmin,
vrev;
@@ -3745,12 +3754,13 @@ ReadHead(ArchiveHandle *AH)
else
AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
- errmsg = supports_compression(AH->compression_spec);
- if (errmsg)
+ parse_compress_specification(AH->compression_spec.algorithm,
+ NULL, &compress_spec);
+ if (compress_spec.parse_error != NULL)
{
pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
- errmsg);
- pg_free(errmsg);
+ compress_spec.parse_error);
+ pg_free(compress_spec.parse_error);
}
if (AH->version >= K_VERS_1_4)
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 41c2b733e3e..29845340859 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -785,6 +785,8 @@ _PrepParallelRestore(ArchiveHandle *AH)
strlcat(fname, ".gz", sizeof(fname));
else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
strlcat(fname, ".lz4", sizeof(fname));
+ else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+ strlcat(fname, ".zst", sizeof(fname));
if (stat(fname, &st) == 0)
te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 2e068c6620e..398a7a7bb3d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -734,19 +734,6 @@ main(int argc, char **argv)
pg_fatal("invalid compression specification: %s",
error_detail);
- switch (compression_algorithm)
- {
- case PG_COMPRESSION_NONE:
- /* fallthrough */
- case PG_COMPRESSION_GZIP:
- /* fallthrough */
- case PG_COMPRESSION_LZ4:
- break;
- case PG_COMPRESSION_ZSTD:
- pg_fatal("compression with %s is not yet supported", "ZSTD");
- break;
- }
-
/*
* Custom and directory formats are compressed by default with gzip when
* available, not the others.
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 42215f82f7a..74f23ae7f74 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -54,8 +54,9 @@ my $tempdir = PostgreSQL::Test::Utils::tempdir;
# those lines) to validate that part of the process.
my $supports_icu = ($ENV{with_icu} eq 'yes');
-my $supports_lz4 = check_pg_config("#define USE_LZ4 1");
my $supports_gzip = check_pg_config("#define HAVE_LIBZ 1");
+my $supports_lz4 = check_pg_config("#define USE_LZ4 1");
+my $supports_zstd = check_pg_config("#define USE_ZSTD 1");
my %pgdump_runs = (
binary_upgrade => {
@@ -213,6 +214,77 @@ my %pgdump_runs = (
},
},
+ compression_zstd_custom => {
+ test_key => 'compression',
+ compile_option => 'zstd',
+ dump_cmd => [
+ 'pg_dump', '--format=custom',
+ '--compress=zstd', "--file=$tempdir/compression_zstd_custom.dump",
+ 'postgres',
+ ],
+ restore_cmd => [
+ 'pg_restore',
+ "--file=$tempdir/compression_zstd_custom.sql",
+ "$tempdir/compression_zstd_custom.dump",
+ ],
+ command_like => {
+ command => [
+ 'pg_restore',
+ '-l', "$tempdir/compression_zstd_custom.dump",
+ ],
+ expected => qr/Compression: zstd/,
+ name => 'data content is zstd compressed'
+ },
+ },
+
+ compression_zstd_dir => {
+ test_key => 'compression',
+ compile_option => 'zstd',
+ dump_cmd => [
+ 'pg_dump', '--jobs=2',
+ '--format=directory', '--compress=zstd:1',
+ "--file=$tempdir/compression_zstd_dir", 'postgres',
+ ],
+ # Give coverage for manually compressed blob.toc files during
+ # restore.
+ compress_cmd => {
+ program => $ENV{'ZSTD'},
+ args => [
+ '-z', '-f', '--rm',
+ "$tempdir/compression_zstd_dir/blobs.toc",
+ "-o", "$tempdir/compression_zstd_dir/blobs.toc.zst",
+ ],
+ },
+ # Verify that data files were compressed
+ glob_patterns => [
+ "$tempdir/compression_zstd_dir/toc.dat",
+ "$tempdir/compression_zstd_dir/*.dat.zst",
+ ],
+ restore_cmd => [
+ 'pg_restore', '--jobs=2',
+ "--file=$tempdir/compression_zstd_dir.sql",
+ "$tempdir/compression_zstd_dir",
+ ],
+ },
+
+ compression_zstd_plain => {
+ test_key => 'compression',
+ compile_option => 'zstd',
+ dump_cmd => [
+ 'pg_dump', '--format=plain', '--compress=zstd',
+ "--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres',
+ ],
+ # Decompress the generated file to run through the tests.
+ compress_cmd => {
+ program => $ENV{'ZSTD'},
+ args => [
+ '-d', '-f',
+ "$tempdir/compression_zstd_plain.sql.zst",
+ "-o", "$tempdir/compression_zstd_plain.sql",
+ ],
+ },
+ },
+
clean => {
dump_cmd => [
'pg_dump',
@@ -4648,10 +4720,11 @@ foreach my $run (sort keys %pgdump_runs)
my $test_key = $run;
my $run_db = 'postgres';
- # Skip command-level tests for gzip/lz4 if there is no support for it.
+ # Skip command-level tests for gzip/lz4/zstd if the tool is not supported
if ($pgdump_runs{$run}->{compile_option} &&
(($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
- ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4)))
+ ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4) ||
+ ($pgdump_runs{$run}->{compile_option} eq 'zstd' && !$supports_zstd)))
{
note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
next;
diff --git a/src/tools/pginclude/cpluspluscheck b/src/tools/pginclude/cpluspluscheck
index 58039934756..10fb51585c9 100755
--- a/src/tools/pginclude/cpluspluscheck
+++ b/src/tools/pginclude/cpluspluscheck
@@ -154,6 +154,7 @@ do
test "$f" = src/bin/pg_dump/compress_io.h && continue
test "$f" = src/bin/pg_dump/compress_lz4.h && continue
test "$f" = src/bin/pg_dump/compress_none.h && continue
+ test "$f" = src/bin/pg_dump/compress_zstd.h && continue
test "$f" = src/bin/pg_dump/parallel.h && continue
test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue
test "$f" = src/bin/pg_dump/pg_dump.h && continue
--
2.34.1
>From 12f839a8b1e601849304bff8b3fc12579d6d4ecc Mon Sep 17 00:00:00 2001
From: Justin Pryzby <[email protected]>
Date: Wed, 4 Jan 2023 21:21:53 -0600
Subject: [PATCH 2/3] TMP: pg_dump: use Zstd by default, for CI only
//-os-only: warnings
---
src/bin/pg_dump/pg_dump.c | 4 ++--
src/bin/pg_dump/t/002_pg_dump.pl | 14 +++++++-------
2 files changed, 9 insertions(+), 9 deletions(-)
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 398a7a7bb3d..cca52d2cd0d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -741,8 +741,8 @@ main(int argc, char **argv)
if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
!user_compression_defined)
{
-#ifdef HAVE_LIBZ
- parse_compress_specification(PG_COMPRESSION_GZIP, NULL,
+#ifdef USE_ZSTD
+ parse_compress_specification(PG_COMPRESSION_ZSTD, NULL,
&compression_spec);
#else
/* Nothing to do in the default case */
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 74f23ae7f74..87f1d2c692d 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -384,10 +384,10 @@ my %pgdump_runs = (
command_like => {
command =>
[ 'pg_restore', '-l', "$tempdir/defaults_custom_format.dump", ],
- expected => $supports_gzip ?
- qr/Compression: gzip/ :
+ expected => $supports_zstd ?
+ qr/Compression: zstd/ :
qr/Compression: none/,
- name => 'data content is gzip-compressed by default if available',
+ name => 'data content is zstd-compressed by default if available',
},
},
@@ -409,16 +409,16 @@ my %pgdump_runs = (
command_like => {
command =>
[ 'pg_restore', '-l', "$tempdir/defaults_dir_format", ],
- expected => $supports_gzip ?
- qr/Compression: gzip/ :
+ expected => $supports_zstd ?
+ qr/Compression: zstd/ :
qr/Compression: none/,
name => 'data content is gzip-compressed by default',
},
glob_patterns => [
"$tempdir/defaults_dir_format/toc.dat",
"$tempdir/defaults_dir_format/blobs.toc",
- $supports_gzip ?
- "$tempdir/defaults_dir_format/*.dat.gz" :
+ $supports_zstd ?
+ "$tempdir/defaults_dir_format/*.dat.zst" :
"$tempdir/defaults_dir_format/*.dat",
],
},
--
2.34.1
>From e818c386dd511871a65350e2b2fe3951f78e1cd6 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <[email protected]>
Date: Sun, 27 Mar 2022 11:55:01 -0500
Subject: [PATCH 3/3] zstd: support long distance mode in pg_dump/basebackup
First proposed here:
[email protected]
//-os-only: freebsd
---
doc/src/sgml/protocol.sgml | 10 +++-
doc/src/sgml/ref/pg_basebackup.sgml | 4 +-
doc/src/sgml/ref/pg_dump.sgml | 7 ++-
src/backend/backup/basebackup_zstd.c | 12 ++++
src/bin/pg_basebackup/bbstreamer_zstd.c | 13 +++++
src/bin/pg_basebackup/t/010_pg_basebackup.pl | 9 ++-
src/bin/pg_dump/compress_zstd.c | 5 ++
src/bin/pg_dump/t/002_pg_dump.pl | 3 +-
src/bin/pg_verifybackup/t/008_untar.pl | 8 +++
src/bin/pg_verifybackup/t/010_client_untar.pl | 8 +++
src/common/compression.c | 57 ++++++++++++++++++-
src/include/common/compression.h | 2 +
12 files changed, 130 insertions(+), 8 deletions(-)
diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 73b7f4432f3..05a887cd092 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2747,7 +2747,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
level. Otherwise, it should be a comma-separated list of items,
each of the form <replaceable>keyword</replaceable> or
<replaceable>keyword=value</replaceable>. Currently, the supported
- keywords are <literal>level</literal> and <literal>workers</literal>.
+ keywords are <literal>level</literal>, <literal>long</literal> and
+ <literal>workers</literal>.
</para>
<para>
@@ -2764,6 +2765,13 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
<literal>3</literal>).
</para>
+ <para>
+ The <literal>long</literal> keyword enables long-distance matching
+ mode, for improved compression ratio, at the expense of higher memory
+ use. Long-distance mode is supported only for
+ <literal>zstd</literal>.
+ </para>
+
<para>
The <literal>workers</literal> keyword sets the number of threads
that should be used for parallel compression. Parallel compression
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index db3ad9cd5eb..79d3e657c32 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -424,8 +424,8 @@ PostgreSQL documentation
level. Otherwise, it should be a comma-separated list of items,
each of the form <literal>keyword</literal> or
<literal>keyword=value</literal>.
- Currently, the supported keywords are <literal>level</literal>
- and <literal>workers</literal>.
+ Currently, the supported keywords are <literal>level</literal>,
+ <literal>long</literal>, and <literal>workers</literal>.
The detail string cannot be used when the compression method
is specified as a plain integer.
</para>
diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 4652087144f..9379b98e7f9 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -678,8 +678,11 @@ PostgreSQL documentation
individual table-data segments, and the default is to compress using
<literal>gzip</literal> at a moderate level. For plain text output,
setting a nonzero compression level causes the entire output file to be compressed,
- as though it had been fed through <application>gzip</application> or
- <application>lz4</application>; but the default is not to compress.
+ as though it had been fed through <application>gzip</application>, or
+ <application>lz4</application>; or <application>zstd</application>,
+ but the default is not to compress.
+ With zstd compression, <literal>long</literal> mode may improve the
+ compression ratio, at the cost of increased memory use.
</para>
<para>
The tar archive format currently does not support compression at all.
diff --git a/src/backend/backup/basebackup_zstd.c b/src/backend/backup/basebackup_zstd.c
index ac6cac178a0..1bb5820c884 100644
--- a/src/backend/backup/basebackup_zstd.c
+++ b/src/backend/backup/basebackup_zstd.c
@@ -118,6 +118,18 @@ bbsink_zstd_begin_backup(bbsink *sink)
compress->workers, ZSTD_getErrorName(ret)));
}
+ if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
+ {
+ ret = ZSTD_CCtx_setParameter(mysink->cctx,
+ ZSTD_c_enableLongDistanceMatching,
+ compress->long_distance);
+ if (ZSTD_isError(ret))
+ ereport(ERROR,
+ errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not set compression flag for %s: %s",
+ "long", ZSTD_getErrorName(ret)));
+ }
+
/*
* We need our own buffer, because we're going to pass different data to
* the next sink than what gets passed to us.
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index fe17d6df4ef..fba391e2a0f 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -106,6 +106,19 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *comp
compress->workers, ZSTD_getErrorName(ret));
}
+ if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
+ {
+ ret = ZSTD_CCtx_setParameter(streamer->cctx,
+ ZSTD_c_enableLongDistanceMatching,
+ compress->long_distance);
+ if (ZSTD_isError(ret))
+ {
+ pg_log_error("could not set compression flag for %s: %s",
+ "long", ZSTD_getErrorName(ret));
+ exit(1);
+ }
+ }
+
/* Initialize the ZSTD output buffer. */
streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index b60cb78a0d5..4d130a7f944 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -139,7 +139,14 @@ SKIP:
'gzip:workers=3',
'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
'failure on worker count for gzip'
- ],);
+ ],
+ [
+ 'gzip:long',
+ 'invalid compression specification: compression algorithm "gzip" does not support long-distance mode',
+ 'failure on long mode for gzip'
+ ],
+ );
+
for my $cft (@compression_failure_tests)
{
my $cfail = quotemeta($client_fails . $cft->[1]);
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index dc610a64de5..ee9513cd555 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -84,6 +84,11 @@ ZstdCStreamParams(pg_compress_specification compress)
Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers,
compress.workers, "workers");
+ if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
+ Zstd_CCtx_setParam_or_die(cstream,
+ ZSTD_c_enableLongDistanceMatching,
+ compress.long_distance, "long");
+
return cstream;
}
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 87f1d2c692d..0a635ae9fc3 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -267,11 +267,12 @@ my %pgdump_runs = (
],
},
+ # Exercise long mode for test coverage
compression_zstd_plain => {
test_key => 'compression',
compile_option => 'zstd',
dump_cmd => [
- 'pg_dump', '--format=plain', '--compress=zstd',
+ 'pg_dump', '--format=plain', '--compress=zstd:long',
"--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres',
],
# Decompress the generated file to run through the tests.
diff --git a/src/bin/pg_verifybackup/t/008_untar.pl b/src/bin/pg_verifybackup/t/008_untar.pl
index 3007bbe8556..05754bc8ec7 100644
--- a/src/bin/pg_verifybackup/t/008_untar.pl
+++ b/src/bin/pg_verifybackup/t/008_untar.pl
@@ -49,6 +49,14 @@ my @test_configuration = (
'decompress_program' => $ENV{'ZSTD'},
'decompress_flags' => ['-d'],
'enabled' => check_pg_config("#define USE_ZSTD 1")
+ },
+ {
+ 'compression_method' => 'zstd',
+ 'backup_flags' => [ '--compress', 'server-zstd:level=1,long' ],
+ 'backup_archive' => 'base.tar.zst',
+ 'decompress_program' => $ENV{'ZSTD'},
+ 'decompress_flags' => ['-d'],
+ 'enabled' => check_pg_config("#define USE_ZSTD 1")
});
for my $tc (@test_configuration)
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
index f3aa0f59e29..ac51a174d14 100644
--- a/src/bin/pg_verifybackup/t/010_client_untar.pl
+++ b/src/bin/pg_verifybackup/t/010_client_untar.pl
@@ -50,6 +50,14 @@ my @test_configuration = (
'decompress_flags' => ['-d'],
'enabled' => check_pg_config("#define USE_ZSTD 1")
},
+ {
+ 'compression_method' => 'zstd',
+ 'backup_flags' => ['--compress', 'client-zstd:level=1,long'],
+ 'backup_archive' => 'base.tar.zst',
+ 'decompress_program' => $ENV{'ZSTD'},
+ 'decompress_flags' => [ '-d' ],
+ 'enabled' => check_pg_config("#define USE_ZSTD 1")
+ },
{
'compression_method' => 'parallel zstd',
'backup_flags' => [ '--compress', 'client-zstd:workers=3' ],
diff --git a/src/common/compression.c b/src/common/compression.c
index 2d3e56b4d62..713a77c292d 100644
--- a/src/common/compression.c
+++ b/src/common/compression.c
@@ -12,7 +12,7 @@
* Otherwise, a compression specification is a comma-separated list of items,
* each having the form keyword or keyword=value.
*
- * Currently, the only supported keywords are "level" and "workers".
+ * Currently, the supported keywords are "level", "long", and "workers".
*
* Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
*
@@ -38,6 +38,8 @@
static int expect_integer_value(char *keyword, char *value,
pg_compress_specification *result);
+static bool expect_boolean_value(char *keyword, char *value,
+ pg_compress_specification *result);
/*
* Look up a compression algorithm by name. Returns true and sets *algorithm
@@ -232,6 +234,11 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio
result->workers = expect_integer_value(keyword, value, result);
result->options |= PG_COMPRESSION_OPTION_WORKERS;
}
+ else if (strcmp(keyword, "long") == 0)
+ {
+ result->long_distance = expect_boolean_value(keyword, value, result);
+ result->options |= PG_COMPRESSION_OPTION_LONG_DISTANCE;
+ }
else
result->parse_error =
psprintf(_("unrecognized compression option: \"%s\""), keyword);
@@ -289,6 +296,43 @@ expect_integer_value(char *keyword, char *value, pg_compress_specification *resu
return ivalue;
}
+/*
+ * Parse 'value' as an boolean and return the result.
+ *
+ * If parsing fails, set result->parse_error to an appropriate message
+ * and return -1. The caller must check result->parse_error to determine if
+ * the call was successful.
+ *
+ * Valid values are: yes, no, on, off, 1, 0.
+ *
+ * Inspired by ParseVariableBool().
+ */
+static bool
+expect_boolean_value(char *keyword, char *value, pg_compress_specification *result)
+{
+ if (value == NULL)
+ return true;
+
+ if (pg_strcasecmp(value, "yes") == 0)
+ return true;
+ if (pg_strcasecmp(value, "on") == 0)
+ return true;
+ if (pg_strcasecmp(value, "1") == 0)
+ return true;
+
+ if (pg_strcasecmp(value, "no") == 0)
+ return false;
+ if (pg_strcasecmp(value, "off") == 0)
+ return false;
+ if (pg_strcasecmp(value, "0") == 0)
+ return false;
+
+ result->parse_error =
+ psprintf(_("value for compression option \"%s\" must be a boolean"),
+ keyword);
+ return false;
+}
+
/*
* Returns NULL if the compression specification string was syntactically
* valid and semantically sensible. Otherwise, returns an error message.
@@ -354,6 +398,17 @@ validate_compress_specification(pg_compress_specification *spec)
get_compress_algorithm_name(spec->algorithm));
}
+ /*
+ * Of the compression algorithms that we currently support, only zstd
+ * supports long-distance mode.
+ */
+ if ((spec->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0 &&
+ (spec->algorithm != PG_COMPRESSION_ZSTD))
+ {
+ return psprintf(_("compression algorithm \"%s\" does not support long-distance mode"),
+ get_compress_algorithm_name(spec->algorithm));
+ }
+
return NULL;
}
diff --git a/src/include/common/compression.h b/src/include/common/compression.h
index b48c173022e..6cf8cf396a8 100644
--- a/src/include/common/compression.h
+++ b/src/include/common/compression.h
@@ -27,6 +27,7 @@ typedef enum pg_compress_algorithm
} pg_compress_algorithm;
#define PG_COMPRESSION_OPTION_WORKERS (1 << 0)
+#define PG_COMPRESSION_OPTION_LONG_DISTANCE (1 << 1)
typedef struct pg_compress_specification
{
@@ -34,6 +35,7 @@ typedef struct pg_compress_specification
unsigned options; /* OR of PG_COMPRESSION_OPTION constants */
int level;
int workers;
+ int long_distance;
char *parse_error; /* NULL if parsing was OK, else message */
} pg_compress_specification;
--
2.34.1