From 531a239e3e254e48378d806889396a55b4412a7e Mon Sep 17 00:00:00 2001
From: Daniel Gustafsson <dgustafsson@postgresql.org>
Date: Tue, 1 Jul 2025 16:23:33 +0200
Subject: [PATCH v8] pg_dump: Fix compression API errorhandling

Compression in pg_dump is implemented with an API where multiple
implementations can co-exist and the selected at runtime by the
user.  The API and its implementations have evolved over time, a
few notable commits include bf9aa490db, e9960732a9, 84adc8e20f5,
and 0da243fed0875.  Upon inspection it was found that the error-
handling specified by the API, and the implementations of it, had
a few deficiencies.  This commit fixes the API as well as all the
implementations to align with the new API definition, as well as
a few other implementations issues.  A full list of changes can
be seen below.

This work was initiated by a report of API misuse, which turned
into a larger examination of the API and its implementations.  As
this is an internal API and not externally exposed it can be back-
patched into all affected branches.

 * write_func:
   - Make write_func throw an error on all error conditions.  All
     callers of write_func were already checking for success and
     calling pg_fatal on all errors, so we might as well make the
     API support that case directly with simpler errorhandling as
     a result.

 * open_func:
   - zstd: move stream initialization from the open function to
     the read and write functions as they can have fatal errors.
     Also ensure to dup the file descriptor like none and gzip.
   - lz4: Ensure to dup the file descriptor like none and gzip.

 * close_func:
   - zstd: Ensure to close the file descriptor even if closing
     down the compressor fails, and clean up state allocation on
     fclose failures.  Make sure to capture errors set by fclose.
   - lz4: Ensure to close the file descriptor even if closing
     down the compressor fails, and instead of calling pg_fatal
     log the failures using pg_log_error. Make sure to capture
     errors set by fclose.
   - none: Make sure to catch errors set by fclose.

 * read_func / gets_func:
   - Make read_func unconditionally return the number of read
     bytes instead of making it optional per implementation.
   - lz4: Make sure to call throw an error and not return -1
   - gzip: gzread returning zero cannot be assumed to indicate
     EOF as it is documented to return zero for some types of
     errors.
   - lz4, zstd: Convert the _read_internal helper functions to
     not call pg_fatal on errors to be able to handle gets_func
     returning NULL on error.

 * getc_func:
   - zstd: Use an unsigned char rather than an int to read char
     into.

 * LZ4Stream_init:
   - Make sure to not switch to inited state until we know that
     initialization succeeded and reset errno just in case.

On top of these changes there are minor comment cleanups and
improvements as well as an attempt to consistently reset errno
in codepaths where it is inspected.

Author: Tom Lane <tgl@sss.pgh.pa.us>
Author: Daniel Gustafsson <daniel@yesql.se>
Reported-by: Evgeniy Gorbanev <gorbanyoves@basealt.ru>
Discussion: https://postgr.es/m/517794.1750082166@sss.pgh.pa.us
Backpatch-through: 16
---
 src/bin/pg_dump/compress_gzip.c       |  35 +++++--
 src/bin/pg_dump/compress_io.c         |   2 +
 src/bin/pg_dump/compress_io.h         |  15 +--
 src/bin/pg_dump/compress_lz4.c        |  92 +++++++++-------
 src/bin/pg_dump/compress_none.c       |  29 +++---
 src/bin/pg_dump/compress_zstd.c       | 144 ++++++++++++++++----------
 src/bin/pg_dump/pg_backup_archiver.c  |   4 +-
 src/bin/pg_dump/pg_backup_directory.c |  52 ++--------
 8 files changed, 208 insertions(+), 165 deletions(-)

diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
index 5a30ebf9bf5..4a067e1402c 100644
--- a/src/bin/pg_dump/compress_gzip.c
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -251,34 +251,49 @@ InitCompressorGzip(CompressorState *cs,
  *----------------------
  */
 
-static bool
-Gzip_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
+static size_t
+Gzip_read(void *ptr, size_t size, CompressFileHandle *CFH)
 {
 	gzFile		gzfp = (gzFile) CFH->private_data;
 	int			gzret;
 
 	gzret = gzread(gzfp, ptr, size);
-	if (gzret <= 0 && !gzeof(gzfp))
+
+	/*
+	 * gzread returns zero on EOF as well as some error conditions, and less
+	 * than zero on other error conditions, so we need to inspect for EOF on
+	 * zero.
+	 */
+	if (gzret <= 0)
 	{
 		int			errnum;
-		const char *errmsg = gzerror(gzfp, &errnum);
+		const char *errmsg;
+
+		if (gzret == 0 && gzeof(gzfp))
+			return 0;
+
+		errmsg = gzerror(gzfp, &errnum);
 
 		pg_fatal("could not read from input file: %s",
 				 errnum == Z_ERRNO ? strerror(errno) : errmsg);
 	}
 
-	if (rsize)
-		*rsize = (size_t) gzret;
-
-	return true;
+	return (size_t) gzret;
 }
 
-static bool
+static void
 Gzip_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 {
 	gzFile		gzfp = (gzFile) CFH->private_data;
+	int			errnum;
+	const char *errmsg;
 
-	return gzwrite(gzfp, ptr, size) > 0;
+	if (gzwrite(gzfp, ptr, size) != size)
+	{
+		errmsg = gzerror(gzfp, &errnum);
+		pg_fatal("could not write to file: %s",
+				 errnum == Z_ERRNO ? strerror(errno) : errmsg);
+	}
 }
 
 static int
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 8c3d9c911c4..9cadc6f2a3f 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -269,6 +269,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 	}
 
 	CFH = InitCompressFileHandle(compression_spec);
+	errno = 0;
 	if (!CFH->open_func(fname, -1, mode, CFH))
 	{
 		free_keep_errno(CFH);
@@ -289,6 +290,7 @@ EndCompressFileHandle(CompressFileHandle *CFH)
 {
 	bool		ret = false;
 
+	errno = 0;
 	if (CFH->private_data)
 		ret = CFH->close_func(CFH);
 
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index db9b38744c8..82f5c71f764 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -123,21 +123,22 @@ struct CompressFileHandle
 									CompressFileHandle *CFH);
 
 	/*
-	 * Read 'size' bytes of data from the file and store them into 'ptr'.
-	 * Optionally it will store the number of bytes read in 'rsize'.
+	 * Read up to 'size' bytes of data from the file and store them into
+	 * 'ptr'.
 	 *
-	 * Returns true on success and throws an internal error otherwise.
+	 * Returns number of bytes read (this might be less than 'size' if EOF was
+	 * reached).  Exits via pg_fatal all for error conditions.
 	 */
-	bool		(*read_func) (void *ptr, size_t size, size_t *rsize,
+	size_t		(*read_func) (void *ptr, size_t size,
 							  CompressFileHandle *CFH);
 
 	/*
 	 * Write 'size' bytes of data into the file from 'ptr'.
 	 *
-	 * Returns true on success and false on error.
+	 * Returns nothing, exits via pg_fatal for all error conditions.
 	 */
-	bool		(*write_func) (const void *ptr, size_t size,
-							   struct CompressFileHandle *CFH);
+	void		(*write_func) (const void *ptr, size_t size,
+							   CompressFileHandle *CFH);
 
 	/*
 	 * Read at most size - 1 characters from the compress file handle into
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index e99f0cad71f..e2f7c468293 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -12,6 +12,7 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres_fe.h"
+#include <unistd.h>
 
 #include "compress_lz4.h"
 #include "pg_backup_utils.h"
@@ -358,7 +359,6 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing)
 		return true;
 
 	state->compressing = compressing;
-	state->inited = true;
 
 	/* When compressing, write LZ4 header to the output stream. */
 	if (state->compressing)
@@ -367,6 +367,7 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing)
 		if (!LZ4State_compression_init(state))
 			return false;
 
+		errno = 0;
 		if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
 		{
 			errno = (errno) ? errno : ENOSPC;
@@ -390,6 +391,7 @@ LZ4Stream_init(LZ4State *state, int size, bool compressing)
 		state->overflowlen = 0;
 	}
 
+	state->inited = true;
 	return true;
 }
 
@@ -457,7 +459,11 @@ LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
 
 	/* Lazy init */
 	if (!LZ4Stream_init(state, size, false /* decompressing */ ))
+	{
+		pg_log_error("unable to initialize LZ4 library: %s",
+					 LZ4F_getErrorName(state->errcode));
 		return -1;
+	}
 
 	/* No work needs to be done for a zero-sized output buffer */
 	if (size <= 0)
@@ -484,7 +490,10 @@ LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
 
 		rsize = fread(readbuf, 1, size, state->fp);
 		if (rsize < size && !feof(state->fp))
+		{
+			pg_log_error("could not read from input file: %m");
 			return -1;
+		}
 
 		rp = (char *) readbuf;
 		rend = (char *) readbuf + rsize;
@@ -501,6 +510,8 @@ LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
 			if (LZ4F_isError(status))
 			{
 				state->errcode = status;
+				pg_log_error("could not read from input file: %s",
+							 LZ4F_getErrorName(state->errcode));
 				return -1;
 			}
 
@@ -558,7 +569,7 @@ LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
 /*
  * Compress size bytes from ptr and write them to the stream.
  */
-static bool
+static void
 LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 {
 	LZ4State   *state = (LZ4State *) CFH->private_data;
@@ -567,7 +578,8 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 
 	/* Lazy init */
 	if (!LZ4Stream_init(state, size, true))
-		return false;
+		pg_fatal("unable to initialize LZ4 library: %s",
+				 LZ4F_getErrorName(state->errcode));
 
 	while (remaining > 0)
 	{
@@ -578,28 +590,24 @@ LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 		status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
 									 ptr, chunk, NULL);
 		if (LZ4F_isError(status))
-		{
-			state->errcode = status;
-			return false;
-		}
+			pg_fatal("error during writing: %s", LZ4F_getErrorName(status));
 
+		errno = 0;
 		if (fwrite(state->buffer, 1, status, state->fp) != status)
 		{
 			errno = (errno) ? errno : ENOSPC;
-			return false;
+			pg_fatal("error during writing: %m");
 		}
 
 		ptr = ((const char *) ptr) + chunk;
 	}
-
-	return true;
 }
 
 /*
  * fread() equivalent implementation for LZ4 compressed files.
  */
-static bool
-LZ4Stream_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
+static size_t
+LZ4Stream_read(void *ptr, size_t size, CompressFileHandle *CFH)
 {
 	LZ4State   *state = (LZ4State *) CFH->private_data;
 	int			ret;
@@ -607,10 +615,7 @@ LZ4Stream_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
 	if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
 		pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
 
-	if (rsize)
-		*rsize = (size_t) ret;
-
-	return true;
+	return (size_t) ret;
 }
 
 /*
@@ -643,11 +648,13 @@ LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
 	int			ret;
 
 	ret = LZ4Stream_read_internal(state, ptr, size - 1, true);
-	if (ret < 0 || (ret == 0 && !LZ4Stream_eof(CFH)))
-		pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
 
-	/* Done reading */
-	if (ret == 0)
+	/*
+	 * LZ4Stream_read_internal returning 0 or -1 means that it was either an
+	 * EOF or an error, but gets_func is defined to return NULL in either case
+	 * so we can treat both the same here.
+	 */
+	if (ret <= 0)
 		return NULL;
 
 	/*
@@ -669,6 +676,7 @@ LZ4Stream_close(CompressFileHandle *CFH)
 	FILE	   *fp;
 	LZ4State   *state = (LZ4State *) CFH->private_data;
 	size_t		status;
+	int			ret;
 
 	fp = state->fp;
 	if (state->inited)
@@ -677,25 +685,31 @@ LZ4Stream_close(CompressFileHandle *CFH)
 		{
 			status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
 			if (LZ4F_isError(status))
-				pg_fatal("could not end compression: %s",
-						 LZ4F_getErrorName(status));
-			else if (fwrite(state->buffer, 1, status, state->fp) != status)
 			{
-				errno = (errno) ? errno : ENOSPC;
-				WRITE_ERROR_EXIT;
+				pg_log_error("could not end compression: %s",
+							 LZ4F_getErrorName(status));
+			}
+			else
+			{
+				errno = 0;
+				if (fwrite(state->buffer, 1, status, state->fp) != status)
+				{
+					errno = (errno) ? errno : ENOSPC;
+					pg_log_error("could not write to output file: %m");
+				}
 			}
 
 			status = LZ4F_freeCompressionContext(state->ctx);
 			if (LZ4F_isError(status))
-				pg_fatal("could not end compression: %s",
-						 LZ4F_getErrorName(status));
+				pg_log_error("could not end compression: %s",
+							 LZ4F_getErrorName(status));
 		}
 		else
 		{
 			status = LZ4F_freeDecompressionContext(state->dtx);
 			if (LZ4F_isError(status))
-				pg_fatal("could not end decompression: %s",
-						 LZ4F_getErrorName(status));
+				pg_log_error("could not end decompression: %s",
+							 LZ4F_getErrorName(status));
 			pg_free(state->overflowbuf);
 		}
 
@@ -703,29 +717,35 @@ LZ4Stream_close(CompressFileHandle *CFH)
 	}
 
 	pg_free(state);
+	CFH->private_data = NULL;
 
-	return fclose(fp) == 0;
+	errno = 0;
+	ret = fclose(fp);
+	if (ret != 0)
+	{
+		pg_log_error("could not close file: %m");
+		return false;
+	}
+
+	return true;
 }
 
 static bool
 LZ4Stream_open(const char *path, int fd, const char *mode,
 			   CompressFileHandle *CFH)
 {
-	FILE	   *fp;
 	LZ4State   *state = (LZ4State *) CFH->private_data;
 
 	if (fd >= 0)
-		fp = fdopen(fd, mode);
+		state->fp = fdopen(dup(fd), mode);
 	else
-		fp = fopen(path, mode);
-	if (fp == NULL)
+		state->fp = fopen(path, mode);
+	if (state->fp == NULL)
 	{
 		state->errcode = errno;
 		return false;
 	}
 
-	state->fp = fp;
-
 	return true;
 }
 
diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c
index 3fc89c99854..9a24ef96cd0 100644
--- a/src/bin/pg_dump/compress_none.c
+++ b/src/bin/pg_dump/compress_none.c
@@ -83,35 +83,31 @@ InitCompressorNone(CompressorState *cs,
  * Private routines
  */
 
-static bool
-read_none(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
+static size_t
+read_none(void *ptr, size_t size, CompressFileHandle *CFH)
 {
 	FILE	   *fp = (FILE *) CFH->private_data;
 	size_t		ret;
 
-	if (size == 0)
-		return true;
-
 	ret = fread(ptr, 1, size, fp);
-	if (ret != size && !feof(fp))
+	if (ferror(fp))
 		pg_fatal("could not read from input file: %m");
 
-	if (rsize)
-		*rsize = ret;
-
-	return true;
+	return ret;
 }
 
-static bool
+static void
 write_none(const void *ptr, size_t size, CompressFileHandle *CFH)
 {
 	size_t		ret;
 
+	errno = 0;
 	ret = fwrite(ptr, 1, size, (FILE *) CFH->private_data);
 	if (ret != size)
-		return false;
-
-	return true;
+	{
+		errno = (errno) ? errno : ENOSPC;
+		pg_fatal("coud not write to file: %m");
+	}
 }
 
 static const char *
@@ -153,7 +149,12 @@ close_none(CompressFileHandle *CFH)
 	CFH->private_data = NULL;
 
 	if (fp)
+	{
+		errno = 0;
 		ret = fclose(fp);
+		if (ret != 0)
+			pg_log_error("could not close file: %m");
+	}
 
 	return ret == 0;
 }
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index cb595b10c2d..e24d45e1bbe 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -13,6 +13,7 @@
  */
 
 #include "postgres_fe.h"
+#include <unistd.h>
 
 #include "compress_zstd.h"
 #include "pg_backup_utils.h"
@@ -258,8 +259,8 @@ InitCompressorZstd(CompressorState *cs,
  * Compressed stream API
  */
 
-static bool
-Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
+static size_t
+Zstd_read_internal(void *ptr, size_t size, CompressFileHandle *CFH, bool exit_on_error)
 {
 	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
 	ZSTD_inBuffer *input = &zstdcs->input;
@@ -268,6 +269,22 @@ Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
 	size_t		res,
 				cnt;
 
+	/*
+	 * If this is the first call to the reading function, initialize the
+	 * required datastructures.
+	 */
+	if (zstdcs->dstream == NULL)
+	{
+		zstdcs->input.src = pg_malloc0(input_allocated_size);
+		zstdcs->dstream = ZSTD_createDStream();
+		if (zstdcs->dstream == NULL)
+		{
+			if (exit_on_error)
+				pg_fatal("could not initialize compression library");
+			return -1;
+		}
+	}
+
 	output->size = size;
 	output->dst = ptr;
 	output->pos = 0;
@@ -292,6 +309,13 @@ Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
 		if (input->pos == input->size)
 		{
 			cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
+			if (ferror(zstdcs->fp))
+			{
+				if (exit_on_error)
+					pg_fatal("could not read from input file: %m");
+				return -1;
+			}
+
 			input->size = cnt;
 
 			Assert(cnt <= input_allocated_size);
@@ -307,7 +331,11 @@ Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
 			res = ZSTD_decompressStream(zstdcs->dstream, output, input);
 
 			if (ZSTD_isError(res))
-				pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+			{
+				if (exit_on_error)
+					pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+				return -1;
+			}
 
 			if (output->pos == output->size)
 				break;			/* No more room for output */
@@ -320,13 +348,10 @@ Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
 			break;				/* We read all the data that fits */
 	}
 
-	if (rdsize != NULL)
-		*rdsize = output->pos;
-
-	return true;
+	return output->pos;
 }
 
-static bool
+static void
 Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 {
 	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
@@ -339,41 +364,40 @@ Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 	input->size = size;
 	input->pos = 0;
 
+	if (zstdcs->cstream == NULL)
+	{
+		zstdcs->output.size = ZSTD_CStreamOutSize();
+		zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
+		zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
+		if (zstdcs->cstream == NULL)
+			pg_fatal("could not initialize compression library");
+	}
+
 	/* Consume all input, to be flushed later */
 	while (input->pos != input->size)
 	{
 		output->pos = 0;
 		res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
 		if (ZSTD_isError(res))
-		{
-			zstdcs->zstderror = ZSTD_getErrorName(res);
-			return false;
-		}
+			pg_fatal("could not write to file: %s", ZSTD_getErrorName(res));
 
+		errno = 0;
 		cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
 		if (cnt != output->pos)
 		{
-			zstdcs->zstderror = strerror(errno);
-			return false;
+			errno = (errno) ? errno : ENOSPC;
+			pg_fatal("could not write to file: %m");
 		}
 	}
-
-	return size;
 }
 
 static int
 Zstd_getc(CompressFileHandle *CFH)
 {
-	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
-	int			ret;
+	unsigned char ret;
 
-	if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
-	{
-		if (feof(zstdcs->fp))
-			pg_fatal("could not read from input file: end of file");
-		else
-			pg_fatal("could not read from input file: %m");
-	}
+	if (CFH->read_func(&ret, 1, CFH) != 1)
+		pg_fatal("could not read from input file: end of file");
 	return ret;
 }
 
@@ -390,11 +414,7 @@ Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
 	 */
 	for (i = 0; i < len - 1; ++i)
 	{
-		size_t		readsz;
-
-		if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
-			break;
-		if (readsz != 1)
+		if (Zstd_read_internal(&buf[i], 1, CFH, false) != 1)
 			break;
 		if (buf[i] == '\n')
 		{
@@ -406,10 +426,17 @@ Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
 	return i > 0 ? buf : NULL;
 }
 
+static size_t
+Zstd_read(void *ptr, size_t size, CompressFileHandle *CFH)
+{
+	return Zstd_read_internal(ptr, size, CFH, true);
+}
+
 static bool
 Zstd_close(CompressFileHandle *CFH)
 {
 	ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+	bool		success = true;
 
 	if (zstdcs->cstream)
 	{
@@ -426,14 +453,18 @@ Zstd_close(CompressFileHandle *CFH)
 			if (ZSTD_isError(res))
 			{
 				zstdcs->zstderror = ZSTD_getErrorName(res);
-				return false;
+				success = false;
+				break;
 			}
 
+			errno = 0;
 			cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
 			if (cnt != output->pos)
 			{
+				errno = (errno) ? errno : ENOSPC;
 				zstdcs->zstderror = strerror(errno);
-				return false;
+				success = false;
+				break;
 			}
 
 			if (res == 0)
@@ -450,11 +481,16 @@ Zstd_close(CompressFileHandle *CFH)
 		pg_free(unconstify(void *, zstdcs->input.src));
 	}
 
+	errno = 0;
 	if (fclose(zstdcs->fp) != 0)
-		return false;
+	{
+		zstdcs->zstderror = strerror(errno);
+		success = false;
+	}
 
 	pg_free(zstdcs);
-	return true;
+	CFH->private_data = NULL;
+	return success;
 }
 
 static bool
@@ -472,35 +508,33 @@ Zstd_open(const char *path, int fd, const char *mode,
 	FILE	   *fp;
 	ZstdCompressorState *zstdcs;
 
+	/*
+	 * Clear state storage to avoid having the fd point to non-NULL memory on
+	 * error return.
+	 */
+	CFH->private_data = NULL;
+
+	zstdcs = (ZstdCompressorState *) pg_malloc_extended(sizeof(*zstdcs),
+														MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
+	if (!zstdcs)
+	{
+		errno = ENOMEM;
+		return false;
+	}
+
 	if (fd >= 0)
-		fp = fdopen(fd, mode);
+		fp = fdopen(dup(fd), mode);
 	else
 		fp = fopen(path, mode);
 
 	if (fp == NULL)
+	{
+		pg_free(zstdcs);
 		return false;
+	}
 
-	zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
-	CFH->private_data = zstdcs;
 	zstdcs->fp = fp;
-
-	if (mode[0] == 'r')
-	{
-		zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
-		zstdcs->dstream = ZSTD_createDStream();
-		if (zstdcs->dstream == NULL)
-			pg_fatal("could not initialize compression library");
-	}
-	else if (mode[0] == 'w' || mode[0] == 'a')
-	{
-		zstdcs->output.size = ZSTD_CStreamOutSize();
-		zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
-		zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
-		if (zstdcs->cstream == NULL)
-			pg_fatal("could not initialize compression library");
-	}
-	else
-		pg_fatal("unhandled mode \"%s\"", mode);
+	CFH->private_data = zstdcs;
 
 	return true;
 }
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index 197c1295d93..9145598ff33 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -1868,8 +1868,8 @@ ahwrite(const void *ptr, size_t size, size_t nmemb, ArchiveHandle *AH)
 	{
 		CompressFileHandle *CFH = (CompressFileHandle *) AH->OF;
 
-		if (CFH->write_func(ptr, size * nmemb, CFH))
-			bytes_written = size * nmemb;
+		CFH->write_func(ptr, size * nmemb, CFH);
+		bytes_written = size * nmemb;
 	}
 
 	if (bytes_written != size * nmemb)
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index bc2a2fb4797..94d401d8a4e 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -316,15 +316,9 @@ _WriteData(ArchiveHandle *AH, const void *data, size_t dLen)
 	lclContext *ctx = (lclContext *) AH->formatData;
 	CompressFileHandle *CFH = ctx->dataFH;
 
-	errno = 0;
-	if (dLen > 0 && !CFH->write_func(data, dLen, CFH))
-	{
-		/* if write didn't set errno, assume problem is no disk space */
-		if (errno == 0)
-			errno = ENOSPC;
-		pg_fatal("could not write to output file: %s",
-				 CFH->get_error_func(CFH));
-	}
+	if (dLen <= 0)
+		return;
+	CFH->write_func(data, dLen, CFH);
 }
 
 /*
@@ -351,7 +345,7 @@ _EndData(ArchiveHandle *AH, TocEntry *te)
 static void
 _PrintFileData(ArchiveHandle *AH, char *filename)
 {
-	size_t		cnt = 0;
+	size_t		cnt;
 	char	   *buf;
 	size_t		buflen;
 	CompressFileHandle *CFH;
@@ -366,7 +360,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	buflen = DEFAULT_IO_BUFFER_SIZE;
 	buf = pg_malloc(buflen);
 
-	while (CFH->read_func(buf, buflen, &cnt, CFH) && cnt > 0)
+	while ((cnt = CFH->read_func(buf, buflen, CFH)) > 0)
 	{
 		ahwrite(buf, 1, cnt, AH);
 	}
@@ -470,16 +464,7 @@ _WriteByte(ArchiveHandle *AH, const int i)
 	lclContext *ctx = (lclContext *) AH->formatData;
 	CompressFileHandle *CFH = ctx->dataFH;
 
-	errno = 0;
-	if (!CFH->write_func(&c, 1, CFH))
-	{
-		/* if write didn't set errno, assume problem is no disk space */
-		if (errno == 0)
-			errno = ENOSPC;
-		pg_fatal("could not write to output file: %s",
-				 CFH->get_error_func(CFH));
-	}
-
+	CFH->write_func(&c, 1, CFH);
 	return 1;
 }
 
@@ -508,15 +493,7 @@ _WriteBuf(ArchiveHandle *AH, const void *buf, size_t len)
 	lclContext *ctx = (lclContext *) AH->formatData;
 	CompressFileHandle *CFH = ctx->dataFH;
 
-	errno = 0;
-	if (!CFH->write_func(buf, len, CFH))
-	{
-		/* if write didn't set errno, assume problem is no disk space */
-		if (errno == 0)
-			errno = ENOSPC;
-		pg_fatal("could not write to output file: %s",
-				 CFH->get_error_func(CFH));
-	}
+	CFH->write_func(buf, len, CFH);
 }
 
 /*
@@ -531,10 +508,10 @@ _ReadBuf(ArchiveHandle *AH, void *buf, size_t len)
 	CompressFileHandle *CFH = ctx->dataFH;
 
 	/*
-	 * If there was an I/O error, we already exited in readF(), so here we
-	 * exit on short reads.
+	 * We do not expect a short read, so fail if we get one.  The read_func
+	 * already dealt with any outright I/O error.
 	 */
-	if (!CFH->read_func(buf, len, NULL, CFH))
+	if (CFH->read_func(buf, len, CFH) != len)
 		pg_fatal("could not read from input file: end of file");
 }
 
@@ -677,14 +654,7 @@ _EndLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
 
 	/* register the LO in blobs_NNN.toc */
 	len = snprintf(buf, sizeof(buf), "%u blob_%u.dat\n", oid, oid);
-	if (!CFH->write_func(buf, len, CFH))
-	{
-		/* if write didn't set errno, assume problem is no disk space */
-		if (errno == 0)
-			errno = ENOSPC;
-		pg_fatal("could not write to LOs TOC file: %s",
-				 CFH->get_error_func(CFH));
-	}
+	CFH->write_func(buf, len, CFH);
 }
 
 /*
-- 
2.39.3 (Apple Git-146)

