On Sat, Feb 25, 2023 at 07:22:27PM -0600, Justin Pryzby wrote:
> On Fri, Feb 24, 2023 at 01:18:40PM -0600, Justin Pryzby wrote:
> > This is a draft patch - review is welcome and would help to get this
> > ready to be considererd for v16, if desired.
> > 
> > I'm going to add this thread to the old CF entry.
> > https://commitfest.postgresql.org/31/2888/
> 
> This resolves cfbot warnings: windows and cppcheck.
> And refactors zstd routines.
> And updates docs.
> And includes some fixes for earlier patches that these patches conflicts
> with/depends on.

This rebases over the TAP and doc fixes to LZ4.
And adds necessary ENV to makefile and meson.
And adds an annoying boilerplate header.
And removes supports_compression(), which is what I think Tomas meant
when referring to "annoying unsupported cases".
And updates zstd.c: fix an off-by-one, allocate in init depending on
readF/writeF, do not reset the input buffer on each iteration, and show
parameter name in errors.

I'd appreciate help checking that this is doing the right things and
works correctly with zstd threaded workers.  The zstd API says: "use one
different context per thread for parallel execution" and "For parallel
execution, use one separate ZSTD_CStream per thread".
https://github.com/facebook/zstd/blob/dev/lib/zstd.h

I understand that to mean that, if pg_dump *itself* were using threads,
then each thread would need to call ZSTD_createCStream().  pg_dump isn't
threaded, so there's nothing special needed, right?

Except that, under windows, pg_dump -Fd -j actually uses threads instead
of forking.  I *think* that's still safe, since the pgdump threads are
created *before* calling zstd functions (see _PrintTocData and
_StartData of the custom and directory formats), so it happens naturally
that there's a separate zstd stream for each thread of pgdump.

-- 
Justin
>From 6a8b88a3dd37d24ebfdaa6a96505476b8a1efe92 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sat, 7 Jan 2023 15:45:06 -0600
Subject: [PATCH 1/3] pg_dump: zstd compression

Previously proposed at: 20201221194924.gi30...@telsasoft.com
---
 doc/src/sgml/ref/pg_dump.sgml         |   8 +-
 src/bin/pg_dump/Makefile              |   2 +
 src/bin/pg_dump/compress_io.c         |  79 ++--
 src/bin/pg_dump/compress_zstd.c       | 515 ++++++++++++++++++++++++++
 src/bin/pg_dump/compress_zstd.h       |   9 +
 src/bin/pg_dump/meson.build           |   2 +
 src/bin/pg_dump/pg_backup_archiver.c  |  28 +-
 src/bin/pg_dump/pg_backup_directory.c |   2 +
 src/bin/pg_dump/pg_dump.c             |  13 -
 src/bin/pg_dump/t/002_pg_dump.pl      |  79 +++-
 src/tools/pginclude/cpluspluscheck    |   1 +
 11 files changed, 654 insertions(+), 84 deletions(-)
 create mode 100644 src/bin/pg_dump/compress_zstd.c
 create mode 100644 src/bin/pg_dump/compress_zstd.h

diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 334e4b7fd14..1fb66be1818 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -330,8 +330,9 @@ PostgreSQL documentation
            machine-readable format that <application>pg_restore</application>
            can read. A directory format archive can be manipulated with
            standard Unix tools; for example, files in an uncompressed archive
-           can be compressed with the <application>gzip</application> or
-           <application>lz4</application> tools.
+           can be compressed with the <application>gzip</application>,
+           <application>lz4</application>, or
+           <application>zstd</application> tools.
            This format is compressed by default using <literal>gzip</literal>
            and also supports parallel dumps.
           </para>
@@ -655,7 +656,8 @@ PostgreSQL documentation
        <para>
         Specify the compression method and/or the compression level to use.
         The compression method can be set to <literal>gzip</literal>,
-        <literal>lz4</literal>, or <literal>none</literal> for no compression.
+        <literal>lz4</literal>, <literal>zstd</literal>,
+        or <literal>none</literal> for no compression.
         A compression detail string can optionally be specified.  If the
         detail string is an integer, it specifies the compression level.
         Otherwise, it should be a comma-separated list of items, each of the
diff --git a/src/bin/pg_dump/Makefile b/src/bin/pg_dump/Makefile
index eb8f59459a1..24de7593a6a 100644
--- a/src/bin/pg_dump/Makefile
+++ b/src/bin/pg_dump/Makefile
@@ -18,6 +18,7 @@ include $(top_builddir)/src/Makefile.global
 
 export GZIP_PROGRAM=$(GZIP)
 export LZ4
+export ZSTD
 export with_icu
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
@@ -29,6 +30,7 @@ OBJS = \
 	compress_io.o \
 	compress_lz4.o \
 	compress_none.o \
+	compress_zstd.o \
 	dumputils.o \
 	parallel.o \
 	pg_backup_archiver.o \
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index ce06f1eac9c..a3c2f36bb67 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -52,8 +52,8 @@
  *
  *	InitDiscoverCompressFileHandle tries to infer the compression by the
  *	filename suffix. If the suffix is not yet known then it tries to simply
- *	open the file and if it fails, it tries to open the same file with the .gz
- *	suffix, and then again with the .lz4 suffix.
+ *	open the file and if it fails, it tries to open the same file with
+ *	compressed suffixes.
  *
  * IDENTIFICATION
  *	   src/bin/pg_dump/compress_io.c
@@ -69,6 +69,7 @@
 #include "compress_io.h"
 #include "compress_lz4.h"
 #include "compress_none.h"
+#include "compress_zstd.h"
 #include "pg_backup_utils.h"
 
 /*----------------------
@@ -76,36 +77,6 @@
  *----------------------
  */
 
-/*
- * Checks whether a compression algorithm is supported.
- *
- * On success returns NULL, otherwise returns a malloc'ed string which can be
- * used by the caller in an error message.
- */
-char *
-supports_compression(const pg_compress_specification compression_spec)
-{
-	const pg_compress_algorithm	algorithm = compression_spec.algorithm;
-	bool						supported = false;
-
-	if (algorithm == PG_COMPRESSION_NONE)
-		supported = true;
-#ifdef HAVE_LIBZ
-	if (algorithm == PG_COMPRESSION_GZIP)
-		supported = true;
-#endif
-#ifdef USE_LZ4
-	if (algorithm == PG_COMPRESSION_LZ4)
-		supported = true;
-#endif
-
-	if (!supported)
-		return psprintf("this build does not support compression with %s",
-						get_compress_algorithm_name(algorithm));
-
-	return NULL;
-}
-
 /*----------------------
  * Compressor API
  *----------------------
@@ -130,6 +101,8 @@ AllocateCompressor(const pg_compress_specification compression_spec,
 		InitCompressorGzip(cs, compression_spec);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
 		InitCompressorLZ4(cs, compression_spec);
+	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+		InitCompressorZstd(cs, compression_spec);
 
 	return cs;
 }
@@ -196,20 +169,30 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
 		InitCompressFileHandleGzip(CFH, compression_spec);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
 		InitCompressFileHandleLZ4(CFH, compression_spec);
+	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+		InitCompressFileHandleZstd(CFH, compression_spec);
 
 	return CFH;
 }
 
+static bool
+check_compressed_file(const char *path, char **fname, char *ext)
+{
+	free_keep_errno(*fname);
+	*fname = psprintf("%s.%s", path, ext);
+	return (access(*fname, F_OK) == 0);
+}
+
 /*
  * Open a file for reading. 'path' is the file to open, and 'mode' should
  * be either "r" or "rb".
  *
  * If the file at 'path' contains the suffix of a supported compression method,
- * currently this includes ".gz" and ".lz4", then this compression will be used
+ * currently this includes ".gz", ".lz4" and ".zst", then this compression will be used
  * throughout. Otherwise the compression will be inferred by iteratively trying
  * to open the file at 'path', first as is, then by appending known compression
  * suffixes. So if you pass "foo" as 'path', this will open either "foo" or
- * "foo.gz" or "foo.lz4", trying in that order.
+ * "foo.{gz,lz4,zst}", trying in that order.
  *
  * On failure, return NULL with an error code in errno.
  */
@@ -237,28 +220,12 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 		/* avoid unused warning if it is not built with compression */
 		if (exists)
 			compression_spec.algorithm = PG_COMPRESSION_NONE;
-#ifdef HAVE_LIBZ
-		if (!exists)
-		{
-			free_keep_errno(fname);
-			fname = psprintf("%s.gz", path);
-			exists = (stat(fname, &st) == 0);
-
-			if (exists)
-				compression_spec.algorithm = PG_COMPRESSION_GZIP;
-		}
-#endif
-#ifdef USE_LZ4
-		if (!exists)
-		{
-			free_keep_errno(fname);
-			fname = psprintf("%s.lz4", path);
-			exists = (stat(fname, &st) == 0);
-
-			if (exists)
-				compression_spec.algorithm = PG_COMPRESSION_LZ4;
-		}
-#endif
+		else if (check_compressed_file(path, &fname, "gz"))
+			compression_spec.algorithm = PG_COMPRESSION_GZIP;
+		else if (check_compressed_file(path, &fname, "lz4"))
+			compression_spec.algorithm = PG_COMPRESSION_LZ4;
+		else if (check_compressed_file(path, &fname, "zst"))
+			compression_spec.algorithm = PG_COMPRESSION_ZSTD;
 	}
 
 	CFH = InitCompressFileHandle(compression_spec);
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
new file mode 100644
index 00000000000..d4c54d6a1dd
--- /dev/null
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -0,0 +1,515 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_zstd.c
+ *	 Routines for archivers to write a Zstd compressed data stream.
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *	   src/bin/pg_dump/compress_zstd.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "pg_backup_utils.h"
+#include "compress_zstd.h"
+
+#ifndef USE_ZSTD
+
+void
+InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
+{
+	pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+{
+	pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+#else
+
+#include <zstd.h>
+
+typedef struct ZstdCompressorState
+{
+	/* This is a normal file to which we read/write compressed data */
+	FILE	   *fp;
+
+	ZSTD_CStream *cstream;
+	ZSTD_DStream *dstream;
+	ZSTD_outBuffer output;
+	ZSTD_inBuffer input;
+
+	/* pointer to a static string like from strerror(), for Zstd_write() */
+	const char *zstderror;
+}			ZstdCompressorState;
+
+static ZSTD_CStream *ZstdCStreamParams(pg_compress_specification compress);
+static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+								   const void *data, size_t dLen);
+static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
+
+static void
+ZSTD_CCtx_setParam_or_die(ZSTD_CStream *cstream,
+						  ZSTD_cParameter param, int value, char *paramname)
+{
+	size_t		res;
+
+	res = ZSTD_CCtx_setParameter(cstream, param, value);
+	if (ZSTD_isError(res))
+		pg_fatal("could not set compression parameter: \"%s\": %s",
+				 paramname, ZSTD_getErrorName(res));
+}
+
+/* Return a compression stream with parameters set per argument */
+static ZSTD_CStream *
+ZstdCStreamParams(pg_compress_specification compress)
+{
+	ZSTD_CStream *cstream;
+
+	cstream = ZSTD_createCStream();
+	if (cstream == NULL)
+		pg_fatal("could not initialize compression library");
+
+	ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
+							  compress.level, "level");
+
+	if (compress.options & PG_COMPRESSION_OPTION_WORKERS)
+		ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers,
+								  compress.workers, "workers");
+
+	return cstream;
+}
+
+/* Helper function for EndCompressorZstd and WriteDataToArchiveZstd */
+static void
+ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	ZSTD_outBuffer *output = &zstdcs->output;
+
+	/* Loop while there's any input or until flushed */
+	while (input->pos != input->size || flush)
+	{
+		size_t		res;
+
+		res = ZSTD_compressStream2(zstdcs->cstream, output,
+								   input, flush ? ZSTD_e_end : ZSTD_e_continue);
+
+		if (ZSTD_isError(res))
+			pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
+
+		/*
+		 * Extra paranoia: avoid zero-length chunks, since a zero length chunk
+		 * is the EOF marker in the custom format. This should never happen
+		 * but...
+		 */
+		if (output->pos > 0)
+			cs->writeF(AH, output->dst, output->pos);
+
+		output->pos = 0;
+
+		if (res == 0)
+			break;
+	}
+}
+
+void
+EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+
+	if (zstdcs->cstream != NULL)
+	{
+		ZstdWriteCommon(AH, cs, true);
+		ZSTD_freeCStream(zstdcs->cstream);
+		pg_free(zstdcs->output.dst);
+	}
+
+	if (zstdcs->dstream != NULL)
+	{
+		ZSTD_freeDStream(zstdcs->dstream);
+		pg_free(unconstify(void *, zstdcs->input.src));
+		pg_free(zstdcs->output.dst);
+	}
+
+	pg_free(zstdcs);
+}
+
+static void
+WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+					   const void *data, size_t dLen)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+
+	zstdcs->input.src = data;
+	zstdcs->input.size = dLen;
+	zstdcs->input.pos = 0;
+
+	ZstdWriteCommon(AH, cs, false);
+}
+
+/* Read data from a compressed zstd archive */
+static void
+ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+	ZSTD_outBuffer *output = &zstdcs->output;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	size_t		input_allocated_size = ZSTD_DStreamInSize();
+	size_t		res;
+
+	for (;;)
+	{
+		size_t		cnt;
+
+		/*
+		 * Read compressed data.  Note that readF can resize the buffer; the
+		 * new size is tracked and used for future loops.
+		 */
+		input->size = input_allocated_size;
+		cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
+		input_allocated_size = input->size;
+		input->size = cnt;
+		input->pos = 0;
+
+		if (cnt == 0)
+			break;
+
+		/* Now decompress */
+		while (input->pos < input->size)
+		{
+			output->pos = 0;
+			res = ZSTD_decompressStream(zstdcs->dstream, output, input);
+			if (ZSTD_isError(res))
+				pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+			/* write to output handle */
+			((char *) output->dst)[output->pos] = '\0';
+			ahwrite(output->dst, 1, output->pos, AH);
+
+			if (res == 0)
+				break;			/* End of frame */
+		}
+	}
+}
+
+/* Public routines that support Zstd compressed data I/O */
+void
+InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
+{
+	ZstdCompressorState *zstdcs;
+
+	cs->readData = ReadDataFromArchiveZstd;
+	cs->writeData = WriteDataToArchiveZstd;
+	cs->end = EndCompressorZstd;
+
+	cs->compression_spec = compression_spec;
+
+	cs->private_data = pg_malloc0(sizeof(ZstdCompressorState));
+	zstdcs = (ZstdCompressorState *) cs->private_data;
+
+	if (cs->readF != NULL)
+	{
+		zstdcs->dstream = ZSTD_createDStream();
+		if (zstdcs->dstream == NULL)
+			pg_fatal("could not initialize compression library");
+
+		zstdcs->input.size = ZSTD_DStreamInSize();
+		zstdcs->input.src = pg_malloc(zstdcs->input.size);
+
+		zstdcs->output.size = ZSTD_DStreamOutSize();
+		zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
+	}
+
+	if (cs->writeF != NULL)
+	{
+		zstdcs->cstream = ZstdCStreamParams(cs->compression_spec);
+
+		zstdcs->output.size = ZSTD_CStreamOutSize();
+		zstdcs->output.dst = pg_malloc(zstdcs->output.size);
+		zstdcs->output.pos = 0;
+	}
+}
+
+/*
+ * Compressed stream API
+ */
+
+static size_t
+Zstd_read(void *ptr, size_t size, CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	ZSTD_outBuffer *output = &zstdcs->output;
+	size_t		input_allocated_size = ZSTD_DStreamInSize();
+	size_t		res,
+				cnt;
+
+	output->size = size;
+	output->dst = ptr;
+	output->pos = 0;
+
+	for (;;)
+	{
+		Assert(input->pos <= input->size);
+		Assert(input->size <= input_allocated_size);
+
+		/* If the input is completely consumed, start back at the beginning */
+		if (input->pos == input->size)
+		{
+			/* input->size is size produced by "fread" */
+			input->size = 0;
+			/* input->pos is position consumed by decompress */
+			input->pos = 0;
+		}
+
+		/* read compressed data if we must produce more input */
+		if (input->pos == input->size)
+		{
+			cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
+			input->size = cnt;
+
+			Assert(cnt >= 0);
+			Assert(input->size <= input_allocated_size);
+
+			/* If we have no input to consume, we're done */
+			if (cnt == 0)
+				break;
+		}
+
+		while (input->pos < input->size)
+		{
+			/* decompress */
+			res = ZSTD_decompressStream(zstdcs->dstream, output, input);
+
+			if (ZSTD_isError(res))
+				pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+			if (output->pos == output->size)
+				break;			/* No more room for output */
+
+			if (res == 0)
+				break;			/* End of frame */
+		}
+
+		if (output->pos == output->size)
+			break;				/* We read all the data that fits */
+	}
+
+	return output->pos;
+}
+
+static size_t
+Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	ZSTD_inBuffer *input = &zstdcs->input;
+	ZSTD_outBuffer *output = &zstdcs->output;
+	size_t		res,
+				cnt;
+
+	input->src = ptr;
+	input->size = size;
+	input->pos = 0;
+
+	/* Consume all input, to be flushed later */
+	while (input->pos != input->size)
+	{
+		output->pos = 0;
+		res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
+		if (ZSTD_isError(res))
+		{
+			zstdcs->zstderror = ZSTD_getErrorName(res);
+			return -1;
+		}
+
+		cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+		if (cnt != output->pos)
+		{
+			zstdcs->zstderror = strerror(errno);
+			return -1;
+		}
+	}
+
+	return size;
+}
+
+static int
+Zstd_getc(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	int			ret;
+
+	if (CFH->read_func(&ret, 1, CFH) != 1)
+	{
+		if (feof(zstdcs->fp))
+			pg_fatal("could not read from input file: end of file");
+		else
+			pg_fatal("could not read from input file: %m");
+	}
+	return ret;
+}
+
+static char *
+Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
+{
+	int			i,
+				res;
+
+	/*
+	 * Read one byte at a time until newline or EOF. This is only used to read
+	 * the list of LOs, and the I/O is buffered anyway.
+	 */
+	for (i = 0; i < len - 1; ++i)
+	{
+		res = CFH->read_func(&buf[i], 1, CFH);
+		if (res != 1)
+			break;
+		if (buf[i] == '\n')
+		{
+			++i;
+			break;
+		}
+	}
+	buf[i] = '\0';
+	return i > 0 ? buf : NULL;
+}
+
+static int
+Zstd_close(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	int			result;
+
+	if (zstdcs->cstream)
+	{
+		size_t		res,
+					cnt;
+		ZSTD_inBuffer *input = &zstdcs->input;
+		ZSTD_outBuffer *output = &zstdcs->output;
+
+		/* Loop until the compression buffers are fully consumed */
+		for (;;)
+		{
+			output->pos = 0;
+			res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
+			if (ZSTD_isError(res))
+			{
+				zstdcs->zstderror = ZSTD_getErrorName(res);
+				return -1;
+			}
+
+			cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+			if (cnt != output->pos)
+			{
+				zstdcs->zstderror = strerror(errno);
+				return -1;
+			}
+
+			if (res == 0)
+				break;
+		}
+
+		ZSTD_freeCStream(zstdcs->cstream);
+		pg_free(zstdcs->output.dst);
+	}
+
+	if (zstdcs->dstream)
+	{
+		ZSTD_freeDStream(zstdcs->dstream);
+		pg_free(unconstify(void *, zstdcs->input.src));
+	}
+
+	result = fclose(zstdcs->fp);
+	pg_free(zstdcs);
+	return result;
+}
+
+static int
+Zstd_eof(CompressFileHandle *CFH)
+{
+	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+	return feof(zstdcs->fp);
+}
+
+static int
+Zstd_open(const char *path, int fd, const char *mode,
+		  CompressFileHandle *CFH)
+{
+	FILE	   *fp;
+	ZstdCompressorState *zstdcs;
+
+	if (fd >= 0)
+		fp = fdopen(fd, mode);
+	else
+		fp = fopen(path, mode);
+
+	if (fp == NULL)
+		return 1;
+
+	CFH->private_data = pg_malloc0(sizeof(ZstdCompressorState));
+	zstdcs = (ZstdCompressorState *) CFH->private_data;
+	zstdcs->fp = fp;
+
+	if (mode[0] == 'r')
+	{
+		zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
+		zstdcs->dstream = ZSTD_createDStream();
+		if (zstdcs->dstream == NULL)
+			pg_fatal("could not initialize compression library");
+	}
+	else if (mode[0] == 'w' || mode[0] == 'a')
+	{
+		zstdcs->output.size = ZSTD_CStreamOutSize();
+		zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
+		zstdcs->cstream = ZstdCStreamParams(CFH->compression_spec);
+	}
+	else
+		pg_fatal("unhandled mode");
+
+	return 0;
+}
+
+static int
+Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
+{
+	char		fname[MAXPGPATH];
+
+	sprintf(fname, "%s.zst", path);
+	return CFH->open_func(fname, -1, mode, CFH);
+}
+
+static const char *
+Zstd_get_error(CompressFileHandle *CFH)
+{
+	return strerror(errno);
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+{
+	CFH->open_func = Zstd_open;
+	CFH->open_write_func = Zstd_open_write;
+	CFH->read_func = Zstd_read;
+	CFH->write_func = Zstd_write;
+	CFH->gets_func = Zstd_gets;
+	CFH->getc_func = Zstd_getc;
+	CFH->close_func = Zstd_close;
+	CFH->eof_func = Zstd_eof;
+	CFH->get_error_func = Zstd_get_error;
+
+	CFH->compression_spec = compression_spec;
+
+	CFH->private_data = NULL;
+}
+
+#endif							/* USE_ZSTD */
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
new file mode 100644
index 00000000000..f36698b4c26
--- /dev/null
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -0,0 +1,9 @@
+#ifndef COMPRESS_ZSTD_H
+#define COMPRESS_ZSTD_H
+
+#include "compress_io.h"
+
+extern void InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec);
+extern void InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec);
+
+#endif /* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/meson.build b/src/bin/pg_dump/meson.build
index 0da476a4c34..c332ef87787 100644
--- a/src/bin/pg_dump/meson.build
+++ b/src/bin/pg_dump/meson.build
@@ -5,6 +5,7 @@ pg_dump_common_sources = files(
   'compress_io.c',
   'compress_lz4.c',
   'compress_none.c',
+  'compress_zstd.c',
   'dumputils.c',
   'parallel.c',
   'pg_backup_archiver.c',
@@ -90,6 +91,7 @@ tests += {
     'env': {
       'GZIP_PROGRAM': gzip.path(),
       'LZ4': program_lz4.found() ? program_lz4.path() : '',
+      'ZSTD': program_zstd.found() ? program_zstd.path() : '',
     },
     'tests': [
       't/001_basic.pl',
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 61ebb8fe85d..6fda84d8774 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -388,10 +388,12 @@ RestoreArchive(Archive *AHX)
 		{
 			if (te->hadDumper && (te->reqs & REQ_DATA) != 0)
 			{
-				char *errmsg = supports_compression(AH->compression_spec);
-				if (errmsg)
+				pg_compress_specification compress_spec;
+				parse_compress_specification(AH->compression_spec.algorithm,
+											 NULL, &compress_spec);
+				if (compress_spec.parse_error != NULL)
 					pg_fatal("cannot restore from compressed archive (%s)",
-							  errmsg);
+							  compress_spec.parse_error);
 				else
 					break;
 			}
@@ -2075,7 +2077,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 
 		/*
 		 * Check if the specified archive is a directory. If so, check if
-		 * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
+		 * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
 		 */
 		if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
 		{
@@ -2086,10 +2088,17 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.gz"))
 				return AH->format;
 #endif
+
 #ifdef USE_LZ4
 			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
 				return AH->format;
 #endif
+
+#ifdef USE_ZSTD
+			if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
+				return AH->format;
+#endif
+
 			pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
 					 AH->fSpec);
 			fh = NULL;			/* keep compiler quiet */
@@ -3674,7 +3683,7 @@ WriteHead(ArchiveHandle *AH)
 void
 ReadHead(ArchiveHandle *AH)
 {
-	char	   *errmsg;
+	pg_compress_specification compress_spec;
 	char		vmaj,
 				vmin,
 				vrev;
@@ -3745,12 +3754,13 @@ ReadHead(ArchiveHandle *AH)
 	else
 		AH->compression_spec.algorithm = PG_COMPRESSION_GZIP;
 
-	errmsg = supports_compression(AH->compression_spec);
-	if (errmsg)
+	parse_compress_specification(AH->compression_spec.algorithm,
+			NULL, &compress_spec);
+	if (compress_spec.parse_error != NULL)
 	{
 		pg_log_warning("archive is compressed, but this installation does not support compression (%s) -- no data will be available",
-						errmsg);
-		pg_free(errmsg);
+						compress_spec.parse_error);
+		pg_free(compress_spec.parse_error);
 	}
 
 	if (AH->version >= K_VERS_1_4)
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 41c2b733e3e..29845340859 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -785,6 +785,8 @@ _PrepParallelRestore(ArchiveHandle *AH)
 				strlcat(fname, ".gz", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
 				strlcat(fname, ".lz4", sizeof(fname));
+			else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+				strlcat(fname, ".zst", sizeof(fname));
 
 			if (stat(fname, &st) == 0)
 				te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 74d806c77ba..08edeef2e3d 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -710,19 +710,6 @@ main(int argc, char **argv)
 		pg_fatal("invalid compression specification: %s",
 				 error_detail);
 
-	switch (compression_algorithm)
-	{
-		case PG_COMPRESSION_NONE:
-			/* fallthrough */
-		case PG_COMPRESSION_GZIP:
-			/* fallthrough */
-		case PG_COMPRESSION_LZ4:
-			break;
-		case PG_COMPRESSION_ZSTD:
-			pg_fatal("compression with %s is not yet supported", "ZSTD");
-			break;
-	}
-
 	/*
 	 * Custom and directory formats are compressed by default with gzip when
 	 * available, not the others.
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 14cd0d2d503..85dcc144f83 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -54,8 +54,9 @@ my $tempdir = PostgreSQL::Test::Utils::tempdir;
 # those lines) to validate that part of the process.
 
 my $supports_icu  = ($ENV{with_icu} eq 'yes');
-my $supports_lz4  = check_pg_config("#define USE_LZ4 1");
 my $supports_gzip = check_pg_config("#define HAVE_LIBZ 1");
+my $supports_lz4  = check_pg_config("#define USE_LZ4 1");
+my $supports_zstd  = check_pg_config("#define USE_ZSTD 1");
 
 my %pgdump_runs = (
 	binary_upgrade => {
@@ -213,6 +214,77 @@ my %pgdump_runs = (
 		},
 	},
 
+	compression_zstd_custom => {
+		test_key       => 'compression',
+		compile_option => 'zstd',
+		dump_cmd       => [
+			'pg_dump',      '--format=custom',
+			'--compress=zstd', "--file=$tempdir/compression_zstd_custom.dump",
+			'postgres',
+		],
+		restore_cmd => [
+			'pg_restore',
+			"--file=$tempdir/compression_zstd_custom.sql",
+			"$tempdir/compression_zstd_custom.dump",
+		],
+		command_like => {
+			command => [
+				'pg_restore',
+				'-l', "$tempdir/compression_zstd_custom.dump",
+			],
+			expected => qr/Compression: zstd/,
+			name => 'data content is zstd compressed'
+		},
+	},
+
+	compression_zstd_dir => {
+		test_key       => 'compression',
+		compile_option => 'zstd',
+		dump_cmd       => [
+			'pg_dump',                              '--jobs=2',
+			'--format=directory',                   '--compress=zstd:1',
+			"--file=$tempdir/compression_zstd_dir", 'postgres',
+		],
+		# Give coverage for manually compressed blob.toc files during
+		# restore.
+		compress_cmd => {
+			program => $ENV{'ZSTD'},
+			args    => [
+				'-z', '-f', '--rm',
+				"$tempdir/compression_zstd_dir/blobs.toc",
+				"-o", "$tempdir/compression_zstd_dir/blobs.toc.zst",
+			],
+		},
+		# Verify that data files were compressed
+		glob_patterns => [
+		    "$tempdir/compression_zstd_dir/toc.dat",
+		    "$tempdir/compression_zstd_dir/*.dat.zst",
+		],
+		restore_cmd => [
+			'pg_restore', '--jobs=2',
+			"--file=$tempdir/compression_zstd_dir.sql",
+			"$tempdir/compression_zstd_dir",
+		],
+	},
+
+	compression_zstd_plain => {
+		test_key       => 'compression',
+		compile_option => 'zstd',
+		dump_cmd       => [
+			'pg_dump', '--format=plain', '--compress=zstd',
+			"--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres',
+		],
+		# Decompress the generated file to run through the tests.
+		compress_cmd => {
+			program => $ENV{'ZSTD'},
+			args    => [
+				'-d', '-f',
+				"$tempdir/compression_zstd_plain.sql.zst",
+				"-o", "$tempdir/compression_zstd_plain.sql",
+			],
+		},
+	},
+
 	clean => {
 		dump_cmd => [
 			'pg_dump',
@@ -4271,10 +4343,11 @@ foreach my $run (sort keys %pgdump_runs)
 	my $test_key = $run;
 	my $run_db   = 'postgres';
 
-	# Skip command-level tests for gzip/lz4 if there is no support for it.
+	# Skip command-level tests for gzip/lz4/zstd if the tool is not supported
 	if ($pgdump_runs{$run}->{compile_option} &&
 		(($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
-		($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4)))
+		($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4) ||
+		($pgdump_runs{$run}->{compile_option} eq 'zstd' && !$supports_zstd)))
 	{
 		note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
 		next;
diff --git a/src/tools/pginclude/cpluspluscheck b/src/tools/pginclude/cpluspluscheck
index 58039934756..10fb51585c9 100755
--- a/src/tools/pginclude/cpluspluscheck
+++ b/src/tools/pginclude/cpluspluscheck
@@ -154,6 +154,7 @@ do
 	test "$f" = src/bin/pg_dump/compress_io.h && continue
 	test "$f" = src/bin/pg_dump/compress_lz4.h && continue
 	test "$f" = src/bin/pg_dump/compress_none.h && continue
+	test "$f" = src/bin/pg_dump/compress_zstd.h && continue
 	test "$f" = src/bin/pg_dump/parallel.h && continue
 	test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue
 	test "$f" = src/bin/pg_dump/pg_dump.h && continue
-- 
2.34.1

>From 002cf327f7309c7d2b94919626b6a972ca875ed4 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Wed, 4 Jan 2023 21:21:53 -0600
Subject: [PATCH 2/3] TMP: pg_dump: use Zstd by default, for CI only

//-os-only: warnings
---
 src/bin/pg_dump/pg_dump.c        |  4 ++--
 src/bin/pg_dump/t/002_pg_dump.pl | 14 +++++++-------
 2 files changed, 9 insertions(+), 9 deletions(-)

diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 08edeef2e3d..2f35b059a75 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -717,8 +717,8 @@ main(int argc, char **argv)
 	if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
 		!user_compression_defined)
 	{
-#ifdef HAVE_LIBZ
-		parse_compress_specification(PG_COMPRESSION_GZIP, NULL,
+#ifdef USE_ZSTD
+		parse_compress_specification(PG_COMPRESSION_ZSTD, NULL,
 									 &compression_spec);
 #else
 		/* Nothing to do in the default case */
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 85dcc144f83..11a28e77f54 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -384,10 +384,10 @@ my %pgdump_runs = (
 		command_like => {
 			command =>
 			  [ 'pg_restore', '-l', "$tempdir/defaults_custom_format.dump", ],
-			expected => $supports_gzip ?
-			qr/Compression: gzip/ :
+			expected => $supports_zstd ?
+			qr/Compression: zstd/ :
 			qr/Compression: none/,
-			name => 'data content is gzip-compressed by default if available',
+			name => 'data content is zstd-compressed by default if available',
 		},
 	},
 
@@ -409,16 +409,16 @@ my %pgdump_runs = (
 		command_like => {
 			command =>
 			  [ 'pg_restore', '-l', "$tempdir/defaults_dir_format", ],
-			expected => $supports_gzip ?
-			qr/Compression: gzip/ :
+			expected => $supports_zstd ?
+			qr/Compression: zstd/ :
 			qr/Compression: none/,
 			name => 'data content is gzip-compressed by default',
 		},
 		glob_patterns => [
 			"$tempdir/defaults_dir_format/toc.dat",
 			"$tempdir/defaults_dir_format/blobs.toc",
-			$supports_gzip ?
-			"$tempdir/defaults_dir_format/*.dat.gz" :
+			$supports_zstd ?
+			"$tempdir/defaults_dir_format/*.dat.zst" :
 			"$tempdir/defaults_dir_format/*.dat",
 		],
 	},
-- 
2.34.1

>From b60bb3ce2cc7e6babc6f8b88a7a5c1f481b79345 Mon Sep 17 00:00:00 2001
From: Justin Pryzby <pryz...@telsasoft.com>
Date: Sun, 27 Mar 2022 11:55:01 -0500
Subject: [PATCH 3/3] zstd: support long distance mode in pg_dump/basebackup

First proposed here:
20220327205020.gm28...@telsasoft.com

//-os-only: freebsd
---
 doc/src/sgml/protocol.sgml                    | 10 +++-
 doc/src/sgml/ref/pg_basebackup.sgml           |  4 +-
 doc/src/sgml/ref/pg_dump.sgml                 |  7 ++-
 src/backend/backup/basebackup_zstd.c          | 12 ++++
 src/bin/pg_basebackup/bbstreamer_zstd.c       | 13 +++++
 src/bin/pg_basebackup/t/010_pg_basebackup.pl  |  9 ++-
 src/bin/pg_dump/compress_zstd.c               |  5 ++
 src/bin/pg_dump/t/002_pg_dump.pl              |  3 +-
 src/bin/pg_verifybackup/t/008_untar.pl        |  8 +++
 src/bin/pg_verifybackup/t/010_client_untar.pl |  8 +++
 src/common/compression.c                      | 57 ++++++++++++++++++-
 src/include/common/compression.h              |  2 +
 12 files changed, 130 insertions(+), 8 deletions(-)

diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml
index 73b7f4432f3..05a887cd092 100644
--- a/doc/src/sgml/protocol.sgml
+++ b/doc/src/sgml/protocol.sgml
@@ -2747,7 +2747,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
            level.  Otherwise, it should be a comma-separated list of items,
            each of the form <replaceable>keyword</replaceable> or
            <replaceable>keyword=value</replaceable>. Currently, the supported
-           keywords are <literal>level</literal> and <literal>workers</literal>.
+           keywords are <literal>level</literal>, <literal>long</literal> and
+           <literal>workers</literal>.
           </para>
 
           <para>
@@ -2764,6 +2765,13 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
            <literal>3</literal>).
           </para>
 
+          <para>
+           The <literal>long</literal> keyword enables long-distance matching
+           mode, for improved compression ratio, at the expense of higher memory
+           use.  Long-distance mode is supported only for
+           <literal>zstd</literal>.
+          </para>
+
           <para>
            The <literal>workers</literal> keyword sets the number of threads
            that should be used for parallel compression. Parallel compression
diff --git a/doc/src/sgml/ref/pg_basebackup.sgml b/doc/src/sgml/ref/pg_basebackup.sgml
index db3ad9cd5eb..79d3e657c32 100644
--- a/doc/src/sgml/ref/pg_basebackup.sgml
+++ b/doc/src/sgml/ref/pg_basebackup.sgml
@@ -424,8 +424,8 @@ PostgreSQL documentation
         level.  Otherwise, it should be a comma-separated list of items,
         each of the form <literal>keyword</literal> or
         <literal>keyword=value</literal>.
-        Currently, the supported keywords are <literal>level</literal>
-        and <literal>workers</literal>.
+        Currently, the supported keywords are <literal>level</literal>,
+        <literal>long</literal>, and <literal>workers</literal>.
         The detail string cannot be used when the compression method
         is specified as a plain integer.
        </para>
diff --git a/doc/src/sgml/ref/pg_dump.sgml b/doc/src/sgml/ref/pg_dump.sgml
index 1fb66be1818..261c18e14cc 100644
--- a/doc/src/sgml/ref/pg_dump.sgml
+++ b/doc/src/sgml/ref/pg_dump.sgml
@@ -678,8 +678,11 @@ PostgreSQL documentation
         individual table-data segments, and the default is to compress using
         <literal>gzip</literal> at a moderate level. For plain text output,
         setting a nonzero compression level causes the entire output file to be compressed,
-        as though it had been fed through <application>gzip</application> or
-        <application>lz4</application>; but the default is not to compress.
+        as though it had been fed through <application>gzip</application>, or
+        <application>lz4</application>; or <application>zstd</application>,
+        but the default is not to compress.
+        With zstd compression, <literal>long</literal> mode may improve the
+        compression ratio, at the cost of increased memory use.
        </para>
        <para>
         The tar archive format currently does not support compression at all.
diff --git a/src/backend/backup/basebackup_zstd.c b/src/backend/backup/basebackup_zstd.c
index ac6cac178a0..1bb5820c884 100644
--- a/src/backend/backup/basebackup_zstd.c
+++ b/src/backend/backup/basebackup_zstd.c
@@ -118,6 +118,18 @@ bbsink_zstd_begin_backup(bbsink *sink)
 						   compress->workers, ZSTD_getErrorName(ret)));
 	}
 
+	if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
+	{
+		ret = ZSTD_CCtx_setParameter(mysink->cctx,
+									 ZSTD_c_enableLongDistanceMatching,
+									 compress->long_distance);
+		if (ZSTD_isError(ret))
+			ereport(ERROR,
+					errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+					errmsg("could not set compression flag for %s: %s",
+						   "long", ZSTD_getErrorName(ret)));
+	}
+
 	/*
 	 * We need our own buffer, because we're going to pass different data to
 	 * the next sink than what gets passed to us.
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index fe17d6df4ef..fba391e2a0f 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -106,6 +106,19 @@ bbstreamer_zstd_compressor_new(bbstreamer *next, pg_compress_specification *comp
 					 compress->workers, ZSTD_getErrorName(ret));
 	}
 
+	if ((compress->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0)
+	{
+		ret = ZSTD_CCtx_setParameter(streamer->cctx,
+									 ZSTD_c_enableLongDistanceMatching,
+									 compress->long_distance);
+		if (ZSTD_isError(ret))
+		{
+			pg_log_error("could not set compression flag for %s: %s",
+						 "long", ZSTD_getErrorName(ret));
+			exit(1);
+		}
+	}
+
 	/* Initialize the ZSTD output buffer. */
 	streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
 	streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
diff --git a/src/bin/pg_basebackup/t/010_pg_basebackup.pl b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
index b60cb78a0d5..4d130a7f944 100644
--- a/src/bin/pg_basebackup/t/010_pg_basebackup.pl
+++ b/src/bin/pg_basebackup/t/010_pg_basebackup.pl
@@ -139,7 +139,14 @@ SKIP:
 			'gzip:workers=3',
 			'invalid compression specification: compression algorithm "gzip" does not accept a worker count',
 			'failure on worker count for gzip'
-		],);
+		],
+		[
+			'gzip:long',
+			'invalid compression specification: compression algorithm "gzip" does not support long-distance mode',
+			'failure on long mode for gzip'
+		],
+	);
+
 	for my $cft (@compression_failure_tests)
 	{
 		my $cfail = quotemeta($client_fails . $cft->[1]);
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index d4c54d6a1dd..09378f62939 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -84,6 +84,11 @@ ZstdCStreamParams(pg_compress_specification compress)
 		ZSTD_CCtx_setParam_or_die(cstream, ZSTD_c_nbWorkers,
 								  compress.workers, "workers");
 
+	if (compress.options & PG_COMPRESSION_OPTION_LONG_DISTANCE)
+		ZSTD_CCtx_setParam_or_die(cstream,
+								  ZSTD_c_enableLongDistanceMatching,
+								  compress.long_distance, "long");
+
 	return cstream;
 }
 
diff --git a/src/bin/pg_dump/t/002_pg_dump.pl b/src/bin/pg_dump/t/002_pg_dump.pl
index 11a28e77f54..634acf39840 100644
--- a/src/bin/pg_dump/t/002_pg_dump.pl
+++ b/src/bin/pg_dump/t/002_pg_dump.pl
@@ -267,11 +267,12 @@ my %pgdump_runs = (
 		],
 	},
 
+	# Exercise long mode for test coverage
 	compression_zstd_plain => {
 		test_key       => 'compression',
 		compile_option => 'zstd',
 		dump_cmd       => [
-			'pg_dump', '--format=plain', '--compress=zstd',
+			'pg_dump', '--format=plain', '--compress=zstd:long',
 			"--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres',
 		],
 		# Decompress the generated file to run through the tests.
diff --git a/src/bin/pg_verifybackup/t/008_untar.pl b/src/bin/pg_verifybackup/t/008_untar.pl
index 3007bbe8556..05754bc8ec7 100644
--- a/src/bin/pg_verifybackup/t/008_untar.pl
+++ b/src/bin/pg_verifybackup/t/008_untar.pl
@@ -49,6 +49,14 @@ my @test_configuration = (
 		'decompress_program' => $ENV{'ZSTD'},
 		'decompress_flags'   => ['-d'],
 		'enabled'            => check_pg_config("#define USE_ZSTD 1")
+	},
+	{
+		'compression_method' => 'zstd',
+		'backup_flags'       => [ '--compress', 'server-zstd:level=1,long' ],
+		'backup_archive'     => 'base.tar.zst',
+		'decompress_program' => $ENV{'ZSTD'},
+		'decompress_flags'   => ['-d'],
+		'enabled'            => check_pg_config("#define USE_ZSTD 1")
 	});
 
 for my $tc (@test_configuration)
diff --git a/src/bin/pg_verifybackup/t/010_client_untar.pl b/src/bin/pg_verifybackup/t/010_client_untar.pl
index f3aa0f59e29..ac51a174d14 100644
--- a/src/bin/pg_verifybackup/t/010_client_untar.pl
+++ b/src/bin/pg_verifybackup/t/010_client_untar.pl
@@ -50,6 +50,14 @@ my @test_configuration = (
 		'decompress_flags'   => ['-d'],
 		'enabled'            => check_pg_config("#define USE_ZSTD 1")
 	},
+	{
+		'compression_method' => 'zstd',
+		'backup_flags' => ['--compress', 'client-zstd:level=1,long'],
+		'backup_archive' => 'base.tar.zst',
+		'decompress_program' => $ENV{'ZSTD'},
+		'decompress_flags' => [ '-d' ],
+		'enabled' => check_pg_config("#define USE_ZSTD 1")
+	},
 	{
 		'compression_method' => 'parallel zstd',
 		'backup_flags'       => [ '--compress', 'client-zstd:workers=3' ],
diff --git a/src/common/compression.c b/src/common/compression.c
index 2d3e56b4d62..713a77c292d 100644
--- a/src/common/compression.c
+++ b/src/common/compression.c
@@ -12,7 +12,7 @@
  * Otherwise, a compression specification is a comma-separated list of items,
  * each having the form keyword or keyword=value.
  *
- * Currently, the only supported keywords are "level" and "workers".
+ * Currently, the supported keywords are "level", "long", and "workers".
  *
  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
  *
@@ -38,6 +38,8 @@
 
 static int	expect_integer_value(char *keyword, char *value,
 								 pg_compress_specification *result);
+static bool expect_boolean_value(char *keyword, char *value,
+								 pg_compress_specification *result);
 
 /*
  * Look up a compression algorithm by name. Returns true and sets *algorithm
@@ -232,6 +234,11 @@ parse_compress_specification(pg_compress_algorithm algorithm, char *specificatio
 			result->workers = expect_integer_value(keyword, value, result);
 			result->options |= PG_COMPRESSION_OPTION_WORKERS;
 		}
+		else if (strcmp(keyword, "long") == 0)
+		{
+			result->long_distance = expect_boolean_value(keyword, value, result);
+			result->options |= PG_COMPRESSION_OPTION_LONG_DISTANCE;
+		}
 		else
 			result->parse_error =
 				psprintf(_("unrecognized compression option: \"%s\""), keyword);
@@ -289,6 +296,43 @@ expect_integer_value(char *keyword, char *value, pg_compress_specification *resu
 	return ivalue;
 }
 
+/*
+ * Parse 'value' as an boolean and return the result.
+ *
+ * If parsing fails, set result->parse_error to an appropriate message
+ * and return -1.  The caller must check result->parse_error to determine if
+ * the call was successful.
+ *
+ * Valid values are: yes, no, on, off, 1, 0.
+ *
+ * Inspired by ParseVariableBool().
+ */
+static bool
+expect_boolean_value(char *keyword, char *value, pg_compress_specification *result)
+{
+	if (value == NULL)
+		return true;
+
+	if (pg_strcasecmp(value, "yes") == 0)
+		return true;
+	if (pg_strcasecmp(value, "on") == 0)
+		return true;
+	if (pg_strcasecmp(value, "1") == 0)
+		return true;
+
+	if (pg_strcasecmp(value, "no") == 0)
+		return false;
+	if (pg_strcasecmp(value, "off") == 0)
+		return false;
+	if (pg_strcasecmp(value, "0") == 0)
+		return false;
+
+	result->parse_error =
+		psprintf(_("value for compression option \"%s\" must be a boolean"),
+				 keyword);
+	return false;
+}
+
 /*
  * Returns NULL if the compression specification string was syntactically
  * valid and semantically sensible.  Otherwise, returns an error message.
@@ -354,6 +398,17 @@ validate_compress_specification(pg_compress_specification *spec)
 						get_compress_algorithm_name(spec->algorithm));
 	}
 
+	/*
+	 * Of the compression algorithms that we currently support, only zstd
+	 * supports long-distance mode.
+	 */
+	if ((spec->options & PG_COMPRESSION_OPTION_LONG_DISTANCE) != 0 &&
+		(spec->algorithm != PG_COMPRESSION_ZSTD))
+	{
+		return psprintf(_("compression algorithm \"%s\" does not support long-distance mode"),
+						get_compress_algorithm_name(spec->algorithm));
+	}
+
 	return NULL;
 }
 
diff --git a/src/include/common/compression.h b/src/include/common/compression.h
index b48c173022e..6cf8cf396a8 100644
--- a/src/include/common/compression.h
+++ b/src/include/common/compression.h
@@ -27,6 +27,7 @@ typedef enum pg_compress_algorithm
 } pg_compress_algorithm;
 
 #define PG_COMPRESSION_OPTION_WORKERS		(1 << 0)
+#define PG_COMPRESSION_OPTION_LONG_DISTANCE	(1 << 1)
 
 typedef struct pg_compress_specification
 {
@@ -34,6 +35,7 @@ typedef struct pg_compress_specification
 	unsigned	options;		/* OR of PG_COMPRESSION_OPTION constants */
 	int			level;
 	int			workers;
+	int			long_distance;
 	char	   *parse_error;	/* NULL if parsing was OK, else message */
 } pg_compress_specification;
 
-- 
2.34.1

Reply via email to