On Tue, Mar 28, 2023 at 06:23:26PM +0200, Tomas Vondra wrote:
> On 3/27/23 19:28, Justin Pryzby wrote:
> > On Fri, Mar 17, 2023 at 03:43:31AM +0100, Tomas Vondra wrote:
> >> On 3/16/23 05:50, Justin Pryzby wrote:
> >>> On Fri, Mar 10, 2023 at 12:48:13PM -0800, Jacob Champion wrote:
> >>>> On Wed, Mar 8, 2023 at 10:59 AM Jacob Champion <jchamp...@timescale.com> 
> >>>> wrote:
> >>>>> I did some smoke testing against zstd's GitHub release on Windows. To
> >>>>> build against it, I had to construct an import library, and put that
> >>>>> and the DLL into the `lib` folder expected by the MSVC scripts...
> >>>>> which makes me wonder if I've chosen a harder way than necessary?
> >>>>
> >>>> It looks like pg_dump's meson.build is missing dependencies on zstd
> >>>> (meson couldn't find the headers in the subproject without them).
> >>>
> >>> I saw that this was added for LZ4, but I hadn't added it for zstd since
> >>> I didn't run into an issue without it.  Could you check that what I've
> >>> added works for your case ?
> >>>
> >>>>> Parallel zstd dumps seem to work as expected, in that the resulting
> >>>>> pg_restore output is identical to uncompressed dumps and nothing
> >>>>> explodes. I haven't inspected the threading implementation for safety
> >>>>> yet, as you mentioned.
> >>>>
> >>>> Hm. Best I can tell, the CloneArchive() machinery is supposed to be
> >>>> handling safety for this, by isolating each thread's state. I don't feel
> >>>> comfortable pronouncing this new addition safe or not, because I'm not
> >>>> sure I understand what the comments in the format-specific _Clone()
> >>>> callbacks are saying yet.
> >>>
> >>> My line of reasoning for unix is that pg_dump forks before any calls to
> >>> zstd.  Nothing zstd does ought to affect the pg_dump layer.  But that
> >>> doesn't apply to pg_dump under windows.  This is an opened question.  If
> >>> there's no solid answer, I could disable/ignore the option (maybe only
> >>> under windows).
> >>
> >> I may be missing something, but why would the patch affect this? Why
> >> would it even affect safety of the parallel dump? And I don't see any
> >> changes to the clone stuff ...
> > 
> > zstd supports using threads during compression, with -Z zstd:workers=N.
> > When unix forks, the child processes can't do anything to mess up the
> > state of the parent processes.  
> > 
> > But windows pg_dump uses threads instead of forking, so it seems
> > possible that the pg_dump -j threads that then spawn zstd threads could
> > "leak threads" and break the main thread.  I suspect there's no issue,
> > but we still ought to verify that before declaring it safe.
> 
> OK. I don't have access to a Windows machine so I can't test that. Is it
> possible to disable the zstd threading, until we figure this out?

I think that's what's best.  I made it issue a warning if "workers" was
specified.  It could also be an error, or just ignored.

I considered disabling workers only for windows, but realized that I
haven't tested with threads myself - my local zstd package is compiled
without threading, and I remember having some issue recompiling it with
threading.  Jacob's recipe for using meson wraps works well, but it
still seems better to leave it as a future feature.  I used that recipe
to enabled zstd with threading on CI (except for linux/autoconf).

> >>> The function is first checking if it was passed a filename which already
> >>> has a suffix.  And if not, it searches through a list of suffixes,
> >>> testing for an existing file with each suffix.  The search with stat()
> >>> doesn't happen if it has a suffix.  I'm having trouble seeing how the
> >>> hasSuffix() branch isn't dead code.  Another opened question.
> >>
> >> AFAICS it's done this way because of this comment in pg_backup_directory
> >>
> >>  * ...
> >>  * ".gz" suffix is added to the filenames. The TOC files are never
> >>  * compressed by pg_dump, however they are accepted with the .gz suffix
> >>  * too, in case the user has manually compressed them with 'gzip'.
> >>
> >> I haven't tried, but I believe that if you manually compress the
> >> directory, it may hit this branch.
> > 
> > That would make sense, but when I tried, it didn't work like that.
> > The filenames were all uncompressed names.  Maybe it worked differently
> > in an older release.  Or maybe it changed during development of the
> > parallel-directory-dump patch and it's actually dead code.
> 
> Interesting. Would be good to find out. I wonder if a little bit of
> git-log digging could tell us more. Anyway, until we confirm it's dead
> code, we should probably do what .gz does and have the same check for
> .lz4 and .zst files.

I found that hasSuffix() and cfopen() originated in the refactored patch
Heikki's sent here; there's no history beyond that.

https://www.postgresql.org/message-id/4D3954C7.9060503%40enterprisedb.com

The patch published there appends the .gz within cfopen(), and the
caller writes into the TOC the filename without ".gz".  It seems like
maybe a few hours prior, Heikki may have been appending the ".gz" suffix
in the caller, and then writing the TOC with filename.gz.

The only way I've been able to get a "filename.gz" passed to hasSuffix
is to write a directory-format dump, with LOs, and without compression,
and then compress the blobs with "gzip", and *also* edit the blobs.toc
file to say ".gz" (which isn't necessary since, if the original file
isn't found, the restore would search for files with compressed
suffixes).

So .. it's not *technically* unreachable, but I can't see why it'd be
useful to support editing the *content* of the blob TOC (other than
compressing it).  I might give some weight to the idea if it were also
possible to edit the non-blob TOC; but, it's a binary file, so no.

For now, I made the change to make zstd and lz4 to behave the same here
as .gz, unless Heikki has a memory or a git reflog going back far enough
to further support the idea that the code path isn't useful.

I'm going to set the patch as RFC as a hint to anyone who would want to
make a final review.

-- 
Justin
>From a4d2f22d98c16e16c718733f30d71dfb0e3adfe2 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sat, 7 Jan 2023 15:45:06 -0600
Subject: [PATCH 1/4] pg_dump: zstd compression

Previously proposed at: 20201221194924.gi30...@telsasoft.com
---
 doc/src/sgml/ref/pg_dump.sgml         |  13 +-
 src/bin/pg_dump/Makefile              |   2 +
 src/bin/pg_dump/compress_io.c         |  66 ++--
 src/bin/pg_dump/compress_zstd.c       | 530 ++++++++++++++++++++++++++
 src/bin/pg_dump/compress_zstd.h       |  25 ++
 src/bin/pg_dump/meson.build           |   4 +-
 src/bin/pg_dump/pg_backup_archiver.c  |   9 +-
 src/bin/pg_dump/pg_backup_directory.c |   2 +
 src/bin/pg_dump/pg_dump.c             |  20 +-
 src/bin/pg_dump/t/002_pg_dump.pl      |  79 +++-
 src/tools/pginclude/cpluspluscheck    |   1 +
 11 files changed, 697 insertions(+), 54 deletions(-)
 create mode 100644 src/bin/pg_dump/compress_zstd.c
 create mode 100644 src/bin/pg_dump/compress_zstd.h

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 77299878e02..8de38e0fd0d 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -330,8 +330,9 @@ PostgreSQL documentation
            machine-readable format that <application>pg_restore</application>
            can read. A directory format archive can be manipulated with
            standard Unix tools; for example, files in an uncompressed archive
-           can be compressed with the <application>gzip</application> or
-           <application>lz4</application> tools.
+           can be compressed with the <application>gzip</application>,
+           <application>lz4</application>, or
+           <application>zstd</application> tools.
            This format is compressed by default using <literal>gzip</literal>
            and also supports parallel dumps.
           </para>
@@ -655,7 +656,8 @@ PostgreSQL documentation
        <para>
         Specify the compression method and/or the compression level to use.
         The compression method can be set to <literal>gzip</literal>,
-        <literal>lz4</literal>, or <literal>none</literal> for no compression.
+        <literal>lz4</literal>, <literal>zstd</literal>,
+        or <literal>none</literal> for no compression.
         A compression detail string can optionally be specified.  If the
         detail string is an integer, it specifies the compression level.
         Otherwise, it should be a comma-separated list of items, each of the
@@ -676,8 +678,9 @@ PostgreSQL documentation
         individual table-data segments, and the default is to compress using
         <literal>gzip</literal> at a moderate level. For plain text output,
         setting a nonzero compression level causes the entire output file to be compressed,
-        as though it had been fed through <application>gzip</application> or
-        <application>lz4</application>; but the default is not to compress.
+        as though it had been fed through <application>gzip</application>,
+        <application>lz4</application>, or <application>zstd</application>;
+        but the default is not to compress.
        </para>
        <para>
         The tar archive format currently does not support compression at all.
diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index eb8f59459a1..24de7593a6a 100644
--- a/src/bin/pg_dump/Makefile
+++ b/src/bin/pg_dump/Makefile
@@ -18,6 +18,7 @@ include $(top_builddir)/src/Makefile.global
 
 export GZIP_PROGRAM=$(GZIP)
 export LZ4
+export ZSTD
 export with_icu
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
@@ -29,6 +30,7 @@ OBJS = \
 	compress_io.o \
 	compress_lz4.o \
 	compress_none.o \
+	compress_zstd.o \
 	dumputils.o \
 	parallel.o \
 	pg_backup_archiver.o \
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 0972a4f934a..4f06bb024f9 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -52,8 +52,8 @@
  *
  *	InitDiscoverCompressFileHandle tries to infer the compression by the
  *	filename suffix. If the suffix is not yet known then it tries to simply
- *	open the file and if it fails, it tries to open the same file with the .gz
- *	suffix, and then again with the .lz4 suffix.
+ *	open the file and if it fails, it tries to open the same file with
+ *	compressed suffixes.
  *
  * IDENTIFICATION
  *	   src/bin/pg_dump/compress_io.c
@@ -69,6 +69,7 @@
 #include "compress_io.h"
 #include "compress_lz4.h"
 #include "compress_none.h"
+#include "compress_zstd.h"
 #include "pg_backup_utils.h"
 
 /*----------------------
@@ -77,7 +78,8 @@
  */
 
 /*
- * Checks whether a compression algorithm is supported.
+ * Checks whether support for a compression algorithm is implemented in
+ * pg_dump/restore.
  *
  * On success returns NULL, otherwise returns a malloc'ed string which can be
  * used by the caller in an error message.
@@ -98,6 +100,10 @@ supports_compression(const pg_compress_specification compression_spec)
 	if (algorithm == PG_COMPRESSION_LZ4)
 		supported = true;
 #endif
+#ifdef USE_ZSTD
+	if (algorithm == PG_COMPRESSION_ZSTD)
+		supported = true;
+#endif
 
 	if (!supported)
 		return psprintf("this build does not support compression with %s",
@@ -130,6 +136,8 @@ AllocateCompressor(const pg_compress_specification compression_spec,
 		InitCompressorGzip(cs, compression_spec);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
 		InitCompressorLZ4(cs, compression_spec);
+	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+		InitCompressorZstd(cs, compression_spec);
 
 	return cs;
 }
@@ -196,20 +204,30 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
 		InitCompressFileHandleGzip(CFH, compression_spec);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
 		InitCompressFileHandleLZ4(CFH, compression_spec);
+	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+		InitCompressFileHandleZstd(CFH, compression_spec);
 
 	return CFH;
 }
 
+static bool
+check_compressed_file(const char *path, char **fname, char *ext)
+{
+	free_keep_errno(*fname);
+	*fname = psprintf("%s.%s", path, ext);
+	return (access(*fname, F_OK) == 0);
+}
+
 /*
  * Open a file for reading. 'path' is the file to open, and 'mode' should
  * be either "r" or "rb".
  *
  * If the file at 'path' contains the suffix of a supported compression method,
- * currently this includes ".gz" and ".lz4", then this compression will be used
+ * currently this includes ".gz", ".lz4" and ".zst", then this compression will be used
  * throughout. Otherwise the compression will be inferred by iteratively trying
  * to open the file at 'path', first as is, then by appending known compression
  * suffixes. So if you pass "foo" as 'path', this will open either "foo" or
- * "foo.gz" or "foo.lz4", trying in that order.
+ * "foo.{gz,lz4,zst}", trying in that order.
  *
  * On failure, return NULL with an error code in errno.
  */
@@ -229,36 +247,20 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 
 	if (hasSuffix(fname, ".gz"))
 		compression_spec.algorithm = PG_COMPRESSION_GZIP;
+	else if (hasSuffix(fname, ".lz4"))
+		compression_spec.algorithm = PG_COMPRESSION_LZ4;
+	else if (hasSuffix(fname, ".zst"))
+		compression_spec.algorithm = PG_COMPRESSION_ZSTD;
 	else
 	{
-		bool		exists;
-
-		exists = (stat(path, &st) == 0);
-		/* avoid unused warning if it is not built with compression */
-		if (exists)
+		if (stat(path, &st) == 0)
 			compression_spec.algorithm = PG_COMPRESSION_NONE;
-#ifdef HAVE_LIBZ
-		if (!exists)
-		{
-			free_keep_errno(fname);
-			fname = psprintf("%s.gz", path);
-			exists = (stat(fname, &st) == 0);
-
-			if (exists)
-				compression_spec.algorithm = PG_COMPRESSION_GZIP;
-		}
-#endif
-#ifdef USE_LZ4
-		if (!exists)
-		{
-			free_keep_errno(fname);
-			fname = psprintf("%s.lz4", path);
-			exists = (stat(fname, &st) == 0);
-
-			if (exists)
-				compression_spec.algorithm = PG_COMPRESSION_LZ4;
-		}
-#endif
+		else if (check_compressed_file(path, &fname, "gz"))
+			compression_spec.algorithm = PG_COMPRESSION_GZIP;
+		else if (check_compressed_file(path, &fname, "lz4"))
+			compression_spec.algorithm = PG_COMPRESSION_LZ4;
+		else if (check_compressed_file(path, &fname, "zst"))
+			compression_spec.algorithm = PG_COMPRESSION_ZSTD;
 	}
 
 	CFH = InitCompressFileHandle(compression_spec);
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
new file mode 100644
index 00000000000..c19d5262943
--- /dev/null
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -0,0 +1,530 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_zstd.c
+ *	 Routines for archivers to write a Zstd compressed data stream.
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	   src/bin/pg_dump/compress_zstd.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "pg_backup_utils.h"
+#include "compress_zstd.h"
+
+#ifndef USE_ZSTD
+
+void
+InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
+{
+	pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+{
+	pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+#else
+
+#include <zstd.h>
+
+typedef struct ZstdCompressorState
+{
+	/* This is a normal file to which we read/write compressed data */
+	FILE	   *fp;
+
+	ZSTD_CStream *cstream;
+	ZSTD_DStream *dstream;
+	ZSTD_outBuffer output;
+	ZSTD_inBuffer input;
+
+	/* pointer to a static string like from strerror(), for Zstd_write() */
+	const char *zstderror;
+}			ZstdCompressorState;
+
+static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
+static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+								   const void *data, size_t dLen);
+static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
+
+static void
+_Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
+						  ZSTD_cParameter param, int value, char *paramname)
+{
+	size_t		res;
+
+	res = ZSTD_CCtx_setParameter(cstream, param, value);
+	if (ZSTD_isError(res))
+		pg_fatal("could not set compression parameter: \"%s\": %s",
+				 paramname, ZSTD_getErrorName(res));
+}
+
+/* Return a compression stream with parameters set per argument */
+static ZSTD_CStream *
+_ZstdCStreamParams(pg_compress_specification compress)
+{
+	ZSTD_CStream *cstream;
+
+	cstream = ZSTD_createCStream();
+	if (cstream == NULL)
+		pg_fatal("could not initialize compression library");
+
+	_Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
+							   compress.level, "level");
+
+	return cstream;
+}
+
+/* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
+static void
+_ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	ZSTD_outBuffer *output = &zstdcs->output;
+
+	/* Loop while there's any input or until flushed */
+	while (input->pos != input->size || flush)
+	{
+		size_t		res;
+
+		output->pos = 0;
+		res = ZSTD_compressStream2(zstdcs->cstream, output,
+								   input, flush ? ZSTD_e_end : ZSTD_e_continue);
+
+		if (ZSTD_isError(res))
+			pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
+
+		/*
+		 * Extra paranoia: avoid zero-length chunks, since a zero length chunk
+		 * is the EOF marker in the custom format. This should never happen
+		 * but...
+		 */
+		if (output->pos > 0)
+			cs->writeF(AH, output->dst, output->pos);
+
+		if (res == 0)
+			break;
+	}
+}
+
+static void
+EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+
+	/* We expect that exactly one of readF/writeF is specified */
+	Assert((cs->readF == NULL) != (cs->writeF == NULL));
+
+	if (cs->readF != NULL)
+	{
+		Assert(zstdcs->cstream == NULL);
+		ZSTD_freeDStream(zstdcs->dstream);
+		pg_free(unconstify(void *, zstdcs->input.src));
+	}
+	else if (cs->writeF != NULL)
+	{
+		Assert(zstdcs->dstream == NULL);
+		_ZstdWriteCommon(AH, cs, true);
+		ZSTD_freeCStream(zstdcs->cstream);
+		pg_free(zstdcs->output.dst);
+	}
+
+	pg_free(zstdcs);
+}
+
+static void
+WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const void *data, size_t dLen)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+
+	zstdcs->input.src = data;
+	zstdcs->input.size = dLen;
+	zstdcs->input.pos = 0;
+
+	_ZstdWriteCommon(AH, cs, false);
+}
+
+static void
+ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+	ZSTD_outBuffer *output = &zstdcs->output;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	size_t		input_allocated_size = ZSTD_DStreamInSize();
+	size_t		res;
+
+	for (;;)
+	{
+		size_t		cnt;
+
+		/*
+		 * Read compressed data.  Note that readF can resize the buffer; the
+		 * new size is tracked and used for future loops.
+		 */
+		input->size = input_allocated_size;
+		cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
+
+		/* ensure that readF didn't *shrink* the buffer */
+		Assert(input->size >= input_allocated_size);
+		input_allocated_size = input->size;
+		input->size = cnt;
+		input->pos = 0;
+
+		if (cnt == 0)
+			break;
+
+		/* Now decompress */
+		while (input->pos < input->size)
+		{
+			output->pos = 0;
+			res = ZSTD_decompressStream(zstdcs->dstream, output, input);
+			if (ZSTD_isError(res))
+				pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+			/* then write the decompressed data to the output handle */
+			((char *) output->dst)[output->pos] = '\0';
+			ahwrite(output->dst, 1, output->pos, AH);
+
+			if (res == 0)
+				break;			/* End of frame */
+		}
+	}
+}
+
+/* Public routine that supports Zstd compressed data I/O */
+void
+InitCompressorZstd(CompressorState *cs,
+				   const pg_compress_specification compression_spec)
+{
+	ZstdCompressorState *zstdcs;
+
+	cs->readData = ReadDataFromArchiveZstd;
+	cs->writeData = WriteDataToArchiveZstd;
+	cs->end = EndCompressorZstd;
+
+	cs->compression_spec = compression_spec;
+
+	zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
+	cs->private_data = zstdcs;
+
+	/* We expect that exactly one of readF/writeF is specified */
+	Assert((cs->readF == NULL) != (cs->writeF == NULL));
+
+	if (cs->readF != NULL)
+	{
+		zstdcs->dstream = ZSTD_createDStream();
+		if (zstdcs->dstream == NULL)
+			pg_fatal("could not initialize compression library");
+
+		zstdcs->input.size = ZSTD_DStreamInSize();
+		zstdcs->input.src = pg_malloc(zstdcs->input.size);
+
+		zstdcs->output.size = ZSTD_DStreamOutSize();
+		zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
+	}
+	else if (cs->writeF != NULL)
+	{
+		zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
+
+		zstdcs->output.size = ZSTD_CStreamOutSize();
+		zstdcs->output.dst = pg_malloc(zstdcs->output.size);
+		zstdcs->output.pos = 0;
+	}
+}
+
+/*
+ * Compressed stream API
+ */
+
+static bool
+Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	ZSTD_outBuffer *output = &zstdcs->output;
+	size_t		input_allocated_size = ZSTD_DStreamInSize();
+	size_t		res,
+				cnt;
+
+	output->size = size;
+	output->dst = ptr;
+	output->pos = 0;
+
+	for (;;)
+	{
+		Assert(input->pos <= input->size);
+		Assert(input->size <= input_allocated_size);
+
+		/* If the input is completely consumed, start back at the beginning */
+		if (input->pos == input->size)
+		{
+			/* input->size is size produced by "fread" */
+			input->size = 0;
+			/* input->pos is position consumed by decompress */
+			input->pos = 0;
+		}
+
+		/* read compressed data if we must produce more input */
+		if (input->pos == input->size)
+		{
+			cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
+			input->size = cnt;
+
+			Assert(cnt <= input_allocated_size);
+
+			/* If we have no more input to consume, we're done */
+			if (cnt == 0)
+				break;
+		}
+
+		while (input->pos < input->size)
+		{
+			/* now decompress */
+			res = ZSTD_decompressStream(zstdcs->dstream, output, input);
+
+			if (ZSTD_isError(res))
+				pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+			if (output->pos == output->size)
+				break;			/* No more room for output */
+
+			if (res == 0)
+				break;			/* End of frame */
+		}
+
+		if (output->pos == output->size)
+			break;				/* We read all the data that fits */
+	}
+
+	if (rdsize != NULL)
+		*rdsize = output->pos;
+
+	return true;
+}
+
+static bool
+Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	ZSTD_outBuffer *output = &zstdcs->output;
+	size_t		res,
+				cnt;
+
+	input->src = ptr;
+	input->size = size;
+	input->pos = 0;
+
+	/* Consume all input, to be flushed later */
+	while (input->pos != input->size)
+	{
+		output->pos = 0;
+		res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
+		if (ZSTD_isError(res))
+		{
+			zstdcs->zstderror = ZSTD_getErrorName(res);
+			return false;
+		}
+
+		cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+		if (cnt != output->pos)
+		{
+			zstdcs->zstderror = strerror(errno);
+			return false;
+		}
+	}
+
+	return size;
+}
+
+static int
+Zstd_getc(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	int			ret;
+
+	if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
+	{
+		if (feof(zstdcs->fp))
+			pg_fatal("could not read from input file: end of file");
+		else
+			pg_fatal("could not read from input file: %m");
+	}
+	return ret;
+}
+
+static char *
+Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
+{
+	int			i;
+
+	Assert(len > 0);
+
+	/*
+	 * Read one byte at a time until newline or EOF. This is only used to read
+	 * the list of LOs, and the I/O is buffered anyway.
+	 */
+	for (i = 0; i < len - 1; ++i)
+	{
+		size_t		readsz;
+
+		if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
+			break;
+		if (readsz != 1)
+			break;
+		if (buf[i] == '\n')
+		{
+			++i;
+			break;
+		}
+	}
+	buf[i] = '\0';
+	return i > 0 ? buf : NULL;
+}
+
+static bool
+Zstd_close(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+	if (zstdcs->cstream)
+	{
+		size_t		res,
+					cnt;
+		ZSTD_inBuffer *input = &zstdcs->input;
+		ZSTD_outBuffer *output = &zstdcs->output;
+
+		/* Loop until the compression buffers are fully consumed */
+		for (;;)
+		{
+			output->pos = 0;
+			res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
+			if (ZSTD_isError(res))
+			{
+				zstdcs->zstderror = ZSTD_getErrorName(res);
+				return false;
+			}
+
+			cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+			if (cnt != output->pos)
+			{
+				zstdcs->zstderror = strerror(errno);
+				return false;
+			}
+
+			if (res == 0)
+				break;			/* End of frame */
+		}
+
+		ZSTD_freeCStream(zstdcs->cstream);
+		pg_free(zstdcs->output.dst);
+	}
+
+	if (zstdcs->dstream)
+	{
+		ZSTD_freeDStream(zstdcs->dstream);
+		pg_free(unconstify(void *, zstdcs->input.src));
+	}
+
+	if (fclose(zstdcs->fp) != 0)
+		return false;
+
+	pg_free(zstdcs);
+	return true;
+}
+
+static bool
+Zstd_eof(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+	return feof(zstdcs->fp);
+}
+
+static bool
+Zstd_open(const char *path, int fd, const char *mode,
+		  CompressFileHandle *CFH)
+{
+	FILE	   *fp;
+	ZstdCompressorState *zstdcs;
+
+	if (fd >= 0)
+		fp = fdopen(fd, mode);
+	else
+		fp = fopen(path, mode);
+
+	if (fp == NULL)
+		return false;
+
+	zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
+	CFH->private_data = zstdcs;
+	zstdcs->fp = fp;
+
+	if (mode[0] == 'r')
+	{
+		zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
+		zstdcs->dstream = ZSTD_createDStream();
+		if (zstdcs->dstream == NULL)
+			pg_fatal("could not initialize compression library");
+	}
+	else if (mode[0] == 'w' || mode[0] == 'a')
+	{
+		zstdcs->output.size = ZSTD_CStreamOutSize();
+		zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
+		zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
+		if (zstdcs->cstream == NULL)
+			pg_fatal("could not initialize compression library");
+	}
+	else
+		pg_fatal("unhandled mode");
+
+	return true;
+}
+
+static bool
+Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
+{
+	char		fname[MAXPGPATH];
+
+	sprintf(fname, "%s.zst", path);
+	return CFH->open_func(fname, -1, mode, CFH);
+}
+
+static const char *
+Zstd_get_error(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+	return zstdcs->zstderror;
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH,
+						   const pg_compress_specification compression_spec)
+{
+	CFH->open_func = Zstd_open;
+	CFH->open_write_func = Zstd_open_write;
+	CFH->read_func = Zstd_read;
+	CFH->write_func = Zstd_write;
+	CFH->gets_func = Zstd_gets;
+	CFH->getc_func = Zstd_getc;
+	CFH->close_func = Zstd_close;
+	CFH->eof_func = Zstd_eof;
+	CFH->get_error_func = Zstd_get_error;
+
+	CFH->compression_spec = compression_spec;
+
+	CFH->private_data = NULL;
+}
+
+#endif							/* USE_ZSTD */
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
new file mode 100644
index 00000000000..2aaa6b100b1
--- /dev/null
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_zstd.h
+ *	 Zstd interface to compress_io.c routines
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	   src/bin/pg_dump/compress_zstd.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef COMPRESS_ZSTD_H
+#define COMPRESS_ZSTD_H
+
+#include "compress_io.h"
+
+extern void InitCompressorZstd(CompressorState *cs,
+		const pg_compress_specification compression_spec);
+extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
+		const pg_compress_specification compression_spec);
+
+#endif /* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build
index b2fb7ac77fd..9d59a106f36 100644
--- a/src/bin/pg_dump/meson.build
+++ b/src/bin/pg_dump/meson.build
@@ -5,6 +5,7 @@ pg_dump_common_sources = files(
   'compress_io.c',
   'compress_lz4.c',
   'compress_none.c',
+  'compress_zstd.c',
   'dumputils.c',
   'parallel.c',
   'pg_backup_archiver.c',
@@ -19,7 +20,7 @@ pg_dump_common_sources = files(
 pg_dump_common = static_library('libpgdump_common',
   pg_dump_common_sources,
   c_pch: pch_postgres_fe_h,
-  dependencies: [frontend_code, libpq, lz4, zlib],
+  dependencies: [frontend_code, libpq, lz4, zlib, zstd],
   kwargs: internal_lib_args,
 )
 
@@ -90,6 +91,7 @@ tests += {
     'env': {
       'GZIP_PROGRAM': gzip.path(),
       'LZ4': program_lz4.found() ? program_lz4.path() : '',
+      'ZSTD': program_zstd.found() ? program_zstd.path() : '',
       'with_icu': icu.found() ? 'yes' : 'no',
     },
     'tests': [
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index ab77e373e91..e8ee6b1ad86 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -2120,7 +2120,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 
 		/*
 		 * Check if the specified archive is a directory. If so, check if
-		 * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
+		 * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
 		 */
 		if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
 		{
@@ -2131,10 +2131,17 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
 				return AH->format;
 #endif
+
 #ifdef USE_LZ4
 			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
 				return AH->format;
 #endif
+
+#ifdef USE_ZSTD
+			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
+				return AH->format;
+#endif
+
 			pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
 					 AH->fSpec);
 			fh = NULL;			/* keep compiler quiet */
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index abaaa3b10e3..2177d5ff425 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -785,6 +785,8 @@ _PrepParallelRestore(ArchiveHandle *AH)
 				strlcat(fname, ".gz", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
 				strlcat(fname, ".lz4", sizeof(fname));
+			else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+				strlcat(fname, ".zst", sizeof(fname));
 
 			if (stat(fname, &st) == 0)
 				te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 6abbcff6834..05833a48460 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -61,6 +61,7 @@
 #include "fe_utils/string_utils.h"
 #include "getopt_long.h"
 #include "libpq/libpq-fs.h"
+#include "compress_io.h"
 #include "parallel.h"
 #include "pg_backup_db.h"
 #include "pg_backup_utils.h"
@@ -735,18 +736,13 @@ main(int argc, char **argv)
 		pg_fatal("invalid compression specification: %s",
 				 error_detail);
 
-	switch (compression_algorithm)
-	{
-		case PG_COMPRESSION_NONE:
-			/* fallthrough */
-		case PG_COMPRESSION_GZIP:
-			/* fallthrough */
-		case PG_COMPRESSION_LZ4:
-			break;
-		case PG_COMPRESSION_ZSTD:
-			pg_fatal("compression with %s is not yet supported", "ZSTD");
-			break;
-	}
+	error_detail = supports_compression(compression_spec);
+	if (error_detail != NULL)
+		pg_fatal("%s", error_detail);
+
+	if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS)
+		pg_log_warning("compression option is not currently supported: \"%s\"",
+					   "workers");
 
 	/*
 	 * Custom and directory formats are compressed by default with gzip when
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 42215f82f7a..74f23ae7f74 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -54,8 +54,9 @@ my $tempdir = PostgreSQL::Test::Utils::tempdir;
 # those lines) to validate that part of the process.
 
 my $supports_icu  = ($ENV{with_icu} eq 'yes');
-my $supports_lz4  = check_pg_config("#define USE_LZ4 1");
 my $supports_gzip = check_pg_config("#define HAVE_LIBZ 1");
+my $supports_lz4  = check_pg_config("#define USE_LZ4 1");
+my $supports_zstd  = check_pg_config("#define USE_ZSTD 1");
 
 my %pgdump_runs = (
 	binary_upgrade => {
@@ -213,6 +214,77 @@ my %pgdump_runs = (
 		},
 	},
 
+	compression_zstd_custom => {
+		test_key       => 'compression',
+		compile_option => 'zstd',
+		dump_cmd       => [
+			'pg_dump',      '--format=custom',
+			'--compress=zstd', "--file=$tempdir/compression_zstd_custom.dump",
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			"--file=$tempdir/compression_zstd_custom.sql",
+			"$tempdir/compression_zstd_custom.dump",
+		],
+		command_like => {
+			command => [
+				'pg_restore',
+				'-l', "$tempdir/compression_zstd_custom.dump",
+			],
+			expected => qr/Compression: zstd/,
+			name => 'data content is zstd compressed'
+		},
+	},
+
+	compression_zstd_dir => {
+		test_key       => 'compression',
+		compile_option => 'zstd',
+		dump_cmd       => [
+			'pg_dump',                              '--jobs=2',
+			'--format=directory',                   '--compress=zstd:1',
+			"--file=$tempdir/compression_zstd_dir", 'postgres',
+		],
+		# Give coverage for manually compressed blob.toc files during
+		# restore.
+		compress_cmd => {
+			program => $ENV{'ZSTD'},
+			args    => [
+				'-z', '-f', '--rm',
+				"$tempdir/compression_zstd_dir/blobs.toc",
+				"-o", "$tempdir/compression_zstd_dir/blobs.toc.zst",
+			],
+		},
+		# Verify that data files were compressed
+		glob_patterns => [
+		    "$tempdir/compression_zstd_dir/toc.dat",
+		    "$tempdir/compression_zstd_dir/*.dat.zst",
+		],
+		restore_cmd => [
+			'pg_restore', '--jobs=2',
+			"--file=$tempdir/compression_zstd_dir.sql",
+			"$tempdir/compression_zstd_dir",
+		],
+	},
+
+	compression_zstd_plain => {
+		test_key       => 'compression',
+		compile_option => 'zstd',
+		dump_cmd       => [
+			'pg_dump', '--format=plain', '--compress=zstd',
+			"--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres',
+		],
+		# Decompress the generated file to run through the tests.
+		compress_cmd => {
+			program => $ENV{'ZSTD'},
+			args    => [
+				'-d', '-f',
+				"$tempdir/compression_zstd_plain.sql.zst",
+				"-o", "$tempdir/compression_zstd_plain.sql",
+			],
+		},
+	},
+
 	clean => {
 		dump_cmd => [
 			'pg_dump',
@@ -4648,10 +4720,11 @@ foreach my $run (sort keys %pgdump_runs)
 	my $test_key = $run;
 	my $run_db   = 'postgres';
 
-	# Skip command-level tests for gzip/lz4 if there is no support for it.
+	# Skip command-level tests for gzip/lz4/zstd if the tool is not supported
 	if ($pgdump_runs{$run}->{compile_option} &&
 		(($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
-		($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4)))
+		($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4) ||
+		($pgdump_runs{$run}->{compile_option} eq 'zstd' && !$supports_zstd)))
 	{
 		note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
 		next;
diff --git a/src/tools/pginclude/cpluspluscheck b/src/tools/pginclude/cpluspluscheck
index 58039934756..10fb51585c9 100755
--- a/src/tools/pginclude/cpluspluscheck
+++ b/src/tools/pginclude/cpluspluscheck
@@ -154,6 +154,7 @@ do
 	test "$f" = src/bin/pg_dump/compress_io.h && continue
 	test "$f" = src/bin/pg_dump/compress_lz4.h && continue
 	test "$f" = src/bin/pg_dump/compress_none.h && continue
+	test "$f" = src/bin/pg_dump/compress_zstd.h && continue
 	test "$f" = src/bin/pg_dump/parallel.h && continue
 	test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue
 	test "$f" = src/bin/pg_dump/pg_dump.h && continue
-- 
2.34.1

>From cead77201ffc76d9f6ea9b853467d846f1618a8a Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sun, 27 Mar 2022 11:55:01 -0500
Subject: [PATCH 2/4] zstd: support long distance mode in pg_dump/basebackup

First proposed here:
20220327205020.gm28...@telsasoft.com
---
 doc/src/sgml/protocol.sgml                    | 10 +++-
 doc/src/sgml/ref/pg_basebackup.sgml           |  4 +-
 doc/src/sgml/ref/pg_dump.sgml                 |  2 +
 src/backend/backup/basebackup_zstd.c          | 12 ++++
 src/bin/pg_basebackup/bbstreamer_zstd.c       | 13 +++++
 src/bin/pg_basebackup/t/010_pg_basebackup.pl  |  9 ++-
 src/bin/pg_dump/compress_zstd.c               |  5 ++
 src/bin/pg_dump/t/002_pg_dump.pl              |  3 +-
 src/bin/pg_verifybackup/t/008_untar.pl        |  8 +++
 src/bin/pg_verifybackup/t/010_client_untar.pl |  8 +++
 src/common/compression.c                      | 57 ++++++++++++++++++-
 src/include/common/compression.h              |  2 +
 12 files changed, 127 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 8b5e7b1ad7f..b11d9a6ba35 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2729,7 +2729,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
            level.  Otherwise, it should be a comma-separated list of items,
            each of the form <replaceable>keyword</replaceable> or
            <replaceable>keyword=value</replaceable>. Currently, the supported
-           keywords are <literal>level</literal> and <literal>workers</literal>.
+           keywords are <literal>level</literal>, <literal>long</literal> and
+           <literal>workers</literal>.
           </para>
 
           <para>
@@ -2746,6 +2747,13 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
            <literal>3</literal>).
           </para>
 
+          <para>
+           The <literal>long</literal> keyword enables long-distance matching
+           mode, for improved compression ratio, at the expense of higher memory
+           use.  Long-distance mode is supported only for
+           <literal>zstd</literal>.
+          </para>
+
           <para>
            The <literal>workers</literal> keyword sets the number of threads
            that should be used for parallel compression. Parallel compression
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index db3ad9cd5eb..79d3e657c32 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -424,8 +424,8 @@ PostgreSQL documentation
         level.  Otherwise, it should be a comma-separated list of items,
         each of the form <literal>keyword</literal> or
         <literal>keyword=value</literal>.
-        Currently, the supported keywords are <literal>level</literal>
-        and <literal>workers</literal>.
+        Currently, the supported keywords are <literal>level</literal>,
+        <literal>long</literal>, and <literal>workers</literal>.
         The detail string cannot be used when the compression method
         is specified as a plain integer.
        </para>
diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 8de38e0fd0d..e81e35c13b3 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -681,6 +681,8 @@ PostgreSQL documentation
         as though it had been fed through <application>gzip</application>,
         <application>lz4</application>, or <application>zstd</application>;
         but the default is not to compress.
+        With zstd compression, <literal>long</literal> mode may improve the
+        compression ratio, at the cost of increased memory use.
        </para>
        <para>
         The tar archive format currently does not support compression at all.
diff --git a/src/backend/backup/basebackup_zstd.c b/src/backend/backup/basebackup_zstd.c
index ac6cac178a0..1bb5820c884 100644
--- a/src/backend/backup/basebackup_zstd.c
+++ b/src/backend/backup/basebackup_zstd.c
@@ -118,6 +118,18 @@ bbsink_zstd_begin_backup(bbsink *sink)
 						   compress->workers, ZSTD_getErrorName(ret)));
 	}
 
+	if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
+	{
+		ret = ZSTD_CCtx_setParameter(mysink->cctx,
+									 ZSTD_c_enableLongDistanceMatching,
+									 compress->long_distance);
+		if (ZSTD_isError(ret))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("could not set compression flag for %s: %s",
+						   "long", ZSTD_getErrorName(ret)));
+	}
+
 	/*
 	 * We need our own buffer, because we're going to pass different data to
 	 * the next sink than what gets passed to us.
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index fe17d6df4ef..fba391e2a0f 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -106,6 +106,19 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *comp
 					 compress->workers, ZSTD_getErrorName(ret));
 	}
 
+	if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
+	{
+		ret = ZSTD_CCtx_setParameter(streamer->cctx,
+									 ZSTD_c_enableLongDistanceMatching,
+									 compress->long_distance);
+		if (ZSTD_isError(ret))
+		{
+			pg_log_error("could not set compression flag for %s: %s",
+						 "long", ZSTD_getErrorName(ret));
+			exit(1);
+		}
+	}
+
 	/* Initialize the ZSTD output buffer. */
 	streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
 	streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index b60cb78a0d5..4d130a7f944 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -139,7 +139,14 @@ SKIP:
 			'gzip:workers=3',
 			'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
 			'failure on worker count for gzip'
-		],);
+		],
+		[
+			'gzip:long',
+			'invalid compression specification: compression algorithm "gzip" does not support long-distance mode',
+			'failure on long mode for gzip'
+		],
+	);
+
 	for my $cft (@compression_failure_tests)
 	{
 		my $cfail = quotemeta($client_fails . $cft->[1]);
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index c19d5262943..c7229ec2922 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -80,6 +80,11 @@ _ZstdCStreamParams(pg_compress_specification compress)
 	_Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
 							   compress.level, "level");
 
+	if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
+		_Zstd_CCtx_setParam_or_die(cstream,
+								  ZSTD_c_enableLongDistanceMatching,
+								  compress.long_distance, "long");
+
 	return cstream;
 }
 
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 74f23ae7f74..bb898b06bb4 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -267,11 +267,12 @@ my %pgdump_runs = (
 		],
 	},
 
+	# Exercise long mode for test coverage
 	compression_zstd_plain => {
 		test_key       => 'compression',
 		compile_option => 'zstd',
 		dump_cmd       => [
-			'pg_dump', '--format=plain', '--compress=zstd',
+			'pg_dump', '--format=plain', '--compress=zstd:long',
 			"--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres',
 		],
 		# Decompress the generated file to run through the tests.
diff --git a/src/bin/pg_verifybackup/t/008_untar.pl b/src/bin/pg_verifybackup/t/008_untar.pl
index 3007bbe8556..05754bc8ec7 100644
--- a/src/bin/pg_verifybackup/t/008_untar.pl
+++ b/src/bin/pg_verifybackup/t/008_untar.pl
@@ -49,6 +49,14 @@ my @test_configuration = (
 		'decompress_program' => $ENV{'ZSTD'},
 		'decompress_flags'   => ['-d'],
 		'enabled'            => check_pg_config("#define USE_ZSTD 1")
+	},
+	{
+		'compression_method' => 'zstd',
+		'backup_flags'       => [ '--compress', 'server-zstd:level=1,long' ],
+		'backup_archive'     => 'base.tar.zst',
+		'decompress_program' => $ENV{'ZSTD'},
+		'decompress_flags'   => ['-d'],
+		'enabled'            => check_pg_config("#define USE_ZSTD 1")
 	});
 
 for my $tc (@test_configuration)
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
index f3aa0f59e29..ac51a174d14 100644
--- a/src/bin/pg_verifybackup/t/010_client_untar.pl
+++ b/src/bin/pg_verifybackup/t/010_client_untar.pl
@@ -50,6 +50,14 @@ my @test_configuration = (
 		'decompress_flags'   => ['-d'],
 		'enabled'            => check_pg_config("#define USE_ZSTD 1")
 	},
+	{
+		'compression_method' => 'zstd',
+		'backup_flags' => ['--compress', 'client-zstd:level=1,long'],
+		'backup_archive' => 'base.tar.zst',
+		'decompress_program' => $ENV{'ZSTD'},
+		'decompress_flags' => [ '-d' ],
+		'enabled' => check_pg_config("#define USE_ZSTD 1")
+	},
 	{
 		'compression_method' => 'parallel zstd',
 		'backup_flags'       => [ '--compress', 'client-zstd:workers=3' ],
diff --git a/src/common/compression.c b/src/common/compression.c
index 2d3e56b4d62..713a77c292d 100644
--- a/src/common/compression.c
+++ b/src/common/compression.c
@@ -12,7 +12,7 @@
  * Otherwise, a compression specification is a comma-separated list of items,
  * each having the form keyword or keyword=value.
  *
- * Currently, the only supported keywords are "level" and "workers".
+ * Currently, the supported keywords are "level", "long", and "workers".
  *
  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
  *
@@ -38,6 +38,8 @@
 
 static int	expect_integer_value(char *keyword, char *value,
 								 pg_compress_specification *result);
+static bool expect_boolean_value(char *keyword, char *value,
+								 pg_compress_specification *result);
 
 /*
  * Look up a compression algorithm by name. Returns true and sets *algorithm
@@ -232,6 +234,11 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio
 			result->workers = expect_integer_value(keyword, value, result);
 			result->options |= PG_COMPRESSION_OPTION_WORKERS;
 		}
+		else if (strcmp(keyword, "long") == 0)
+		{
+			result->long_distance = expect_boolean_value(keyword, value, result);
+			result->options |= PG_COMPRESSION_OPTION_LONG_DISTANCE;
+		}
 		else
 			result->parse_error =
 				psprintf(_("unrecognized compression option: \"%s\""), keyword);
@@ -289,6 +296,43 @@ expect_integer_value(char *keyword, char *value, pg_compress_specification *resu
 	return ivalue;
 }
 
+/*
+ * Parse 'value' as an boolean and return the result.
+ *
+ * If parsing fails, set result->parse_error to an appropriate message
+ * and return -1.  The caller must check result->parse_error to determine if
+ * the call was successful.
+ *
+ * Valid values are: yes, no, on, off, 1, 0.
+ *
+ * Inspired by ParseVariableBool().
+ */
+static bool
+expect_boolean_value(char *keyword, char *value, pg_compress_specification *result)
+{
+	if (value == NULL)
+		return true;
+
+	if (pg_strcasecmp(value, "yes") == 0)
+		return true;
+	if (pg_strcasecmp(value, "on") == 0)
+		return true;
+	if (pg_strcasecmp(value, "1") == 0)
+		return true;
+
+	if (pg_strcasecmp(value, "no") == 0)
+		return false;
+	if (pg_strcasecmp(value, "off") == 0)
+		return false;
+	if (pg_strcasecmp(value, "0") == 0)
+		return false;
+
+	result->parse_error =
+		psprintf(_("value for compression option \"%s\" must be a boolean"),
+				 keyword);
+	return false;
+}
+
 /*
  * Returns NULL if the compression specification string was syntactically
  * valid and semantically sensible.  Otherwise, returns an error message.
@@ -354,6 +398,17 @@ validate_compress_specification(pg_compress_specification *spec)
 						get_compress_algorithm_name(spec->algorithm));
 	}
 
+	/*
+	 * Of the compression algorithms that we currently support, only zstd
+	 * supports long-distance mode.
+	 */
+	if ((spec->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0 &&
+		(spec->algorithm != PG_COMPRESSION_ZSTD))
+	{
+		return psprintf(_("compression algorithm \"%s\" does not support long-distance mode"),
+						get_compress_algorithm_name(spec->algorithm));
+	}
+
 	return NULL;
 }
 
diff --git a/src/include/common/compression.h b/src/include/common/compression.h
index b48c173022e..6cf8cf396a8 100644
--- a/src/include/common/compression.h
+++ b/src/include/common/compression.h
@@ -27,6 +27,7 @@ typedef enum pg_compress_algorithm
 } pg_compress_algorithm;
 
 #define PG_COMPRESSION_OPTION_WORKERS		(1 << 0)
+#define PG_COMPRESSION_OPTION_LONG_DISTANCE	(1 << 1)
 
 typedef struct pg_compress_specification
 {
@@ -34,6 +35,7 @@ typedef struct pg_compress_specification
 	unsigned	options;		/* OR of PG_COMPRESSION_OPTION constants */
 	int			level;
 	int			workers;
+	int			long_distance;
 	char	   *parse_error;	/* NULL if parsing was OK, else message */
 } pg_compress_specification;
 
-- 
2.34.1

>From 39b023ef6626dcd303aeee8e9c2418f107244f0f Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Thu, 30 Mar 2023 17:48:57 -0500
Subject: [PATCH 3/4] WIP: pg_dump: support zstd workers

This is a separate commit since it's not essential; the zstd library is
frequently compiled without threading support, so the functionality
isn't very well-tested, and because use of zstd threads might
conceivably play poorly with pg_dump's use of threads under Windows.

Targetting postgres v17.
---
 doc/src/sgml/ref/pg_dump.sgml   | 8 ++++++--
 src/bin/pg_dump/compress_zstd.c | 4 ++++
 src/bin/pg_dump/pg_dump.c       | 4 ----
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index e81e35c13b3..1d55ce05b21 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -681,8 +681,12 @@ PostgreSQL documentation
         as though it had been fed through <application>gzip</application>,
         <application>lz4</application>, or <application>zstd</application>;
         but the default is not to compress.
-        With zstd compression, <literal>long</literal> mode may improve the
-        compression ratio, at the cost of increased memory use.
+        With zstd compression, <literal>long</literal> and
+        <literal>workers</literal> options may be specified to enable long-distance
+        matching and threaded workers, respectively.
+        Long distance mode may improve the compression ratio, at the cost of
+        increased memory use.
+        Threaded workers allow leveraging multiple CPUs during compression.
        </para>
        <para>
         The tar archive format currently does not support compression at all.
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index c7229ec2922..1b821f8ecb1 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -85,6 +85,10 @@ _ZstdCStreamParams(pg_compress_specification compress)
 								  ZSTD_c_enableLongDistanceMatching,
 								  compress.long_distance, "long");
 
+	if (compress.options & PG_COMPRESSION_OPTION_WORKERS)
+		_Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers,
+								   compress.workers, "workers");
+
 	return cstream;
 }
 
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 05833a48460..c0c165c2940 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -740,10 +740,6 @@ main(int argc, char **argv)
 	if (error_detail != NULL)
 		pg_fatal("%s", error_detail);
 
-	if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS)
-		pg_log_warning("compression option is not currently supported: \"%s\"",
-					   "workers");
-
 	/*
 	 * Custom and directory formats are compressed by default with gzip when
 	 * available, not the others.
-- 
2.34.1

>From a353ce7ecce1804170bbdf8c56313fab92376561 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Wed, 4 Jan 2023 21:21:53 -0600
Subject: [PATCH 4/4] TMP: pg_dump: use Zstd by default, for CI only

//-os-only: linux-meson
---
 .cirrus.yml                      |  9 ++++++++-
 src/bin/pg_dump/compress_zstd.c  |  9 +++++++++
 src/bin/pg_dump/pg_dump.c        |  4 ++--
 src/bin/pg_dump/t/002_pg_dump.pl | 14 +++++++-------
 4 files changed, 26 insertions(+), 10 deletions(-)

diff --git a/.cirrus.yml b/.cirrus.yml
index 5b1747522f9..14402a0ad5c 100644
--- a/.cirrus.yml
+++ b/.cirrus.yml
@@ -267,6 +267,7 @@ LINUX_CONFIGURE_FEATURES: &LINUX_CONFIGURE_FEATURES >-
 LINUX_MESON_FEATURES: &LINUX_MESON_FEATURES >-
   -Dllvm=enabled
   -Duuid=e2fs
+  -Dzstd=enabled
 
 
 # Linux, both 32bit and 64bit
@@ -389,6 +390,9 @@ task:
 
       configure_script: |
         su postgres <<-EOF
+          mkdir subprojects
+          meson wrap install zstd
+          meson configure -D zstd:multithread=enabled --force-fallback-for=zstd
           meson setup \
             --buildtype=debug \
             -Dcassert=true \
@@ -616,7 +620,10 @@ task:
   # Use /DEBUG:FASTLINK to avoid high memory usage during linking
   configure_script: |
     vcvarsall x64
-    meson setup --backend ninja --buildtype debug -Dc_link_args=/DEBUG:FASTLINK -Dcassert=true -Db_pch=true -Dextra_lib_dirs=c:\openssl\1.1\lib -Dextra_include_dirs=c:\openssl\1.1\include -DTAR=%TAR% -DPG_TEST_EXTRA="%PG_TEST_EXTRA%" build
+    mkdir subprojects
+    meson wrap install zstd
+    meson configure -D zstd:multithread=enabled --force-fallback-for=zstd
+    meson setup --backend ninja --buildtype debug -Dc_link_args=/DEBUG:FASTLINK -Dcassert=true -Db_pch=true -Dextra_lib_dirs=c:\openssl\1.1\lib -Dextra_include_dirs=c:\openssl\1.1\include -DTAR=%TAR% -DPG_TEST_EXTRA="%PG_TEST_EXTRA%" -D zstd=enabled build
 
   build_script: |
     vcvarsall x64
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index 1b821f8ecb1..94dc16cff49 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -88,6 +88,15 @@ _ZstdCStreamParams(pg_compress_specification compress)
 	if (compress.options & PG_COMPRESSION_OPTION_WORKERS)
 		_Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers,
 								   compress.workers, "workers");
+	else
+	{
+		size_t		res;
+
+		res = ZSTD_CCtx_setParameter(cstream, ZSTD_c_nbWorkers, 3);
+		if (ZSTD_isError(res))
+			pg_log_warning("could not set compression parameter: \"%s\": %s",
+					 "workers", ZSTD_getErrorName(res));
+	}
 
 	return cstream;
 }
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index c0c165c2940..f146a29aeff 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -747,8 +747,8 @@ main(int argc, char **argv)
 	if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
 		!user_compression_defined)
 	{
-#ifdef HAVE_LIBZ
-		parse_compress_specification(PG_COMPRESSION_GZIP, NULL,
+#ifdef USE_ZSTD
+		parse_compress_specification(PG_COMPRESSION_ZSTD, NULL,
 									 &compression_spec);
 #else
 		/* Nothing to do in the default case */
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index bb898b06bb4..0a635ae9fc3 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -385,10 +385,10 @@ my %pgdump_runs = (
 		command_like => {
 			command =>
 			  [ 'pg_restore', '-l', "$tempdir/defaults_custom_format.dump", ],
-			expected => $supports_gzip ?
-			qr/Compression: gzip/ :
+			expected => $supports_zstd ?
+			qr/Compression: zstd/ :
 			qr/Compression: none/,
-			name => 'data content is gzip-compressed by default if available',
+			name => 'data content is zstd-compressed by default if available',
 		},
 	},
 
@@ -410,16 +410,16 @@ my %pgdump_runs = (
 		command_like => {
 			command =>
 			  [ 'pg_restore', '-l', "$tempdir/defaults_dir_format", ],
-			expected => $supports_gzip ?
-			qr/Compression: gzip/ :
+			expected => $supports_zstd ?
+			qr/Compression: zstd/ :
 			qr/Compression: none/,
 			name => 'data content is gzip-compressed by default',
 		},
 		glob_patterns => [
 			"$tempdir/defaults_dir_format/toc.dat",
 			"$tempdir/defaults_dir_format/blobs.toc",
-			$supports_gzip ?
-			"$tempdir/defaults_dir_format/*.dat.gz" :
+			$supports_zstd ?
+			"$tempdir/defaults_dir_format/*.dat.zst" :
 			"$tempdir/defaults_dir_format/*.dat",
 		],
 	},
-- 
2.34.1

Reply via email to