From 554b93d117f135ad4ba702d524f1415a385ef8f5 Mon Sep 17 00:00:00 2001
From: Daniel Gustafsson <dgustafsson@postgresql.org>
Date: Wed, 25 Jun 2025 14:38:23 +0200
Subject: [PATCH v6 3/6] pg_dump compression API: open_func

open_func is defined to return true on success and false on error,
with errorhandling left to the caller.

 * zstd: move stream initialization from the open function to read
   and write functions as they can have fatal errors.  Also ensure
   to dup() the file descriptor like none and gzip already do.
 * lz4: Ensure to dup() the file descriptor like none and gzip
   already do.

A TODO to future self is to move all initialization into the init
function, but that requires passing the mode in order to do it in
a sane manner.  This is left as an excercise for v19.
---
 src/bin/pg_dump/compress_io.c   |  1 +
 src/bin/pg_dump/compress_lz4.c  | 10 +++---
 src/bin/pg_dump/compress_zstd.c | 62 ++++++++++++++++++++++-----------
 3 files changed, 46 insertions(+), 27 deletions(-)

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 8c3d9c911c4..cf5358de741 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -269,6 +269,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 	}
 
 	CFH = InitCompressFileHandle(compression_spec);
+	errno = 0;
 	if (!CFH->open_func(fname, -1, mode, CFH))
 	{
 		free_keep_errno(CFH);
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index 1840d572216..881700f83bb 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -12,6 +12,7 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres_fe.h"
+#include <unistd.h>
 
 #include "compress_lz4.h"
 #include "pg_backup_utils.h"
@@ -707,21 +708,18 @@ static bool
 LZ4Stream_open(const char *path, int fd, const char *mode,
 			   CompressFileHandle *CFH)
 {
-	FILE	   *fp;
 	LZ4State   *state = (LZ4State *) CFH->private_data;
 
 	if (fd >= 0)
-		fp = fdopen(fd, mode);
+		state->fp = fdopen(dup(fd), mode);
 	else
-		fp = fopen(path, mode);
-	if (fp == NULL)
+		state->fp = fopen(path, mode);
+	if (state->fp == NULL)
 	{
 		state->errcode = errno;
 		return false;
 	}
 
-	state->fp = fp;
-
 	return true;
 }
 
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index 7ae93d94f66..d58cc1d5c88 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -13,6 +13,7 @@
  */
 
 #include "postgres_fe.h"
+#include <unistd.h>
 
 #include "compress_zstd.h"
 #include "pg_backup_utils.h"
@@ -268,6 +269,18 @@ Zstd_read(void *ptr, size_t size, CompressFileHandle *CFH)
 	size_t		res,
 				cnt;
 
+	/*
+	 * If this is the first call to the reading function, initialize the
+	 * required datastructures.
+	 */
+	if (zstdcs->dstream == NULL)
+	{
+		zstdcs->input.src = pg_malloc0(input_allocated_size);
+		zstdcs->dstream = ZSTD_createDStream();
+		if (zstdcs->dstream == NULL)
+			pg_fatal("could not initialize compression library");
+	}
+
 	output->size = size;
 	output->dst = ptr;
 	output->pos = 0;
@@ -339,6 +352,15 @@ Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 	input->size = size;
 	input->pos = 0;
 
+	if (zstdcs->cstream == NULL)
+	{
+		zstdcs->output.size = ZSTD_CStreamOutSize();
+		zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
+		zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
+		if (zstdcs->cstream == NULL)
+			pg_fatal("could not initialize compression library");
+	}
+
 	/* Consume all input, to be flushed later */
 	while (input->pos != input->size)
 	{
@@ -460,35 +482,33 @@ Zstd_open(const char *path, int fd, const char *mode,
 	FILE	   *fp;
 	ZstdCompressorState *zstdcs;
 
+	/*
+	 * Clear state storage to avoid having the fd point to non-NULL memory on
+	 * error return.
+	 */
+	CFH->private_data = NULL;
+
+	zstdcs = (ZstdCompressorState *) pg_malloc_extended(sizeof(*zstdcs),
+														MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO);
+	if (!zstdcs)
+	{
+		errno = ENOMEM;
+		return false;
+	}
+
 	if (fd >= 0)
-		fp = fdopen(fd, mode);
+		fp = fdopen(dup(fd), mode);
 	else
 		fp = fopen(path, mode);
 
 	if (fp == NULL)
+	{
+		pg_free(zstdcs);
 		return false;
+	}
 
-	zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
-	CFH->private_data = zstdcs;
 	zstdcs->fp = fp;
-
-	if (mode[0] == 'r')
-	{
-		zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
-		zstdcs->dstream = ZSTD_createDStream();
-		if (zstdcs->dstream == NULL)
-			pg_fatal("could not initialize compression library");
-	}
-	else if (mode[0] == 'w' || mode[0] == 'a')
-	{
-		zstdcs->output.size = ZSTD_CStreamOutSize();
-		zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
-		zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
-		if (zstdcs->cstream == NULL)
-			pg_fatal("could not initialize compression library");
-	}
-	else
-		pg_fatal("unhandled mode \"%s\"", mode);
+	CFH->private_data = zstdcs;
 
 	return true;
 }
-- 
2.39.3 (Apple Git-146)

