From 64ad5bbfd84927ae933a282181b98bc4dd768508 Mon Sep 17 00:00:00 2001
From: Jeevan Ladhe <jeevan.ladhe@enterprisedb.com>
Date: Wed, 16 Feb 2022 22:51:47 +0530
Subject: [PATCH 4/4] ZSTD: add client-side decompression support.

ZSTD decompression of a backup compressed on the server can be
performed on the client using pg_basebackup -Fp --compress server-lz4.

Example:
pg_basebackup -D /tmp/zstd_C_D -Fp -Xfetch --compress=server-zstd:7
---
 src/bin/pg_basebackup/bbstreamer.h       |   1 +
 src/bin/pg_basebackup/bbstreamer_zstd.c  | 133 +++++++++++++++++++++++
 src/bin/pg_basebackup/pg_basebackup.c    |   2 +
 src/bin/pg_verifybackup/t/009_extract.pl |   5 +
 4 files changed, 141 insertions(+)
 mode change 100644 => 100755 src/bin/pg_verifybackup/t/009_extract.pl

diff --git a/src/bin/pg_basebackup/bbstreamer.h b/src/bin/pg_basebackup/bbstreamer.h
index bfc624a863..02d4c05df6 100644
--- a/src/bin/pg_basebackup/bbstreamer.h
+++ b/src/bin/pg_basebackup/bbstreamer.h
@@ -211,6 +211,7 @@ extern bbstreamer *bbstreamer_lz4_compressor_new(bbstreamer *next,
 extern bbstreamer *bbstreamer_lz4_decompressor_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_zstd_compressor_new(bbstreamer *next,
 												  int compresslevel);
+extern bbstreamer *bbstreamer_zstd_decompressor_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_parser_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_terminator_new(bbstreamer *next);
 extern bbstreamer *bbstreamer_tar_archiver_new(bbstreamer *next);
diff --git a/src/bin/pg_basebackup/bbstreamer_zstd.c b/src/bin/pg_basebackup/bbstreamer_zstd.c
index 0b20267cf4..83b59d63ba 100644
--- a/src/bin/pg_basebackup/bbstreamer_zstd.c
+++ b/src/bin/pg_basebackup/bbstreamer_zstd.c
@@ -27,6 +27,7 @@ typedef struct bbstreamer_zstd_frame
 	bbstreamer	base;
 
 	ZSTD_CCtx  *cctx;
+	ZSTD_DCtx  *dctx;
 	ZSTD_outBuffer zstd_outBuf;
 } bbstreamer_zstd_frame;
 
@@ -42,6 +43,19 @@ const bbstreamer_ops bbstreamer_zstd_compressor_ops = {
 	.finalize = bbstreamer_zstd_compressor_finalize,
 	.free = bbstreamer_zstd_compressor_free
 };
+
+static void bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
+												 bbstreamer_member *member,
+												 const char *data, int len,
+												 bbstreamer_archive_context context);
+static void bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer);
+static void bbstreamer_zstd_decompressor_free(bbstreamer *streamer);
+
+const bbstreamer_ops bbstreamer_zstd_decompressor_ops = {
+	.content = bbstreamer_zstd_decompressor_content,
+	.finalize = bbstreamer_zstd_decompressor_finalize,
+	.free = bbstreamer_zstd_decompressor_free
+};
 #endif
 
 /*
@@ -200,3 +214,122 @@ bbstreamer_zstd_compressor_free(bbstreamer *streamer)
 	pfree(streamer);
 }
 #endif
+
+/*
+ * Create a new base backup streamer that performs decompression of zstd
+ * compressed blocks.
+ */
+bbstreamer *
+bbstreamer_zstd_decompressor_new(bbstreamer *next)
+{
+#ifdef HAVE_LIBZSTD
+	bbstreamer_zstd_frame *streamer;
+
+	Assert(next != NULL);
+
+	streamer = palloc0(sizeof(bbstreamer_zstd_frame));
+	*((const bbstreamer_ops **) &streamer->base.bbs_ops) =
+		&bbstreamer_zstd_decompressor_ops;
+
+	streamer->base.bbs_next = next;
+	initStringInfo(&streamer->base.bbs_buffer);
+	enlargeStringInfo(&streamer->base.bbs_buffer, ZSTD_DStreamOutSize());
+
+	streamer->dctx = ZSTD_createDCtx();
+	if (!streamer->dctx)
+	{
+		pg_log_error("could not create zstd decompression context");
+		exit(1);
+	}
+
+	/* Initialize the ZSTD output buffer. */
+	streamer->zstd_outBuf.dst = streamer->base.bbs_buffer.data;
+	streamer->zstd_outBuf.size = streamer->base.bbs_buffer.maxlen;
+	streamer->zstd_outBuf.pos = 0;
+
+	return &streamer->base;
+#else
+	pg_log_error("this build does not support compression");
+	exit(1);
+#endif
+}
+
+#ifdef HAVE_LIBZSTD
+/*
+ * Decompress the input data to output buffer until we run out of input
+ * data. Each time the output buffer is full, pass on the decompressed data
+ * to the next streamer.
+ */
+static void
+bbstreamer_zstd_decompressor_content(bbstreamer *streamer,
+									 bbstreamer_member *member,
+									 const char *data, int len,
+									 bbstreamer_archive_context context)
+{
+	bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+	ZSTD_inBuffer inBuf = {data, len, 0};
+
+	while (inBuf.pos < inBuf.size)
+	{
+		size_t		ret;
+
+		/*
+		 * If output buffer is full then forward the content to next streamer
+		 * and update the output buffer.
+		 */
+		if (mystreamer->zstd_outBuf.pos >= mystreamer->zstd_outBuf.size)
+		{
+			bbstreamer_content(mystreamer->base.bbs_next, member,
+							   mystreamer->zstd_outBuf.dst,
+							   mystreamer->zstd_outBuf.pos,
+							   context);
+
+			/* Reset the ZSTD output buffer. */
+			mystreamer->zstd_outBuf.dst = mystreamer->base.bbs_buffer.data;
+			mystreamer->zstd_outBuf.size = mystreamer->base.bbs_buffer.maxlen;
+			mystreamer->zstd_outBuf.pos = 0;
+		}
+
+		ret = ZSTD_decompressStream(mystreamer->dctx,
+									&mystreamer->zstd_outBuf, &inBuf);
+
+		if (ZSTD_isError(ret))
+			pg_log_error("could not decompress data: %s", ZSTD_getErrorName(ret));
+	}
+}
+
+/*
+ * End-of-stream processing.
+ */
+static void
+bbstreamer_zstd_decompressor_finalize(bbstreamer *streamer)
+{
+	bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+
+	/*
+	 * End of the stream, if there is some pending data in output buffers then
+	 * we must forward it to next streamer.
+	 */
+	if (mystreamer->zstd_outBuf.pos > 0)
+		bbstreamer_content(mystreamer->base.bbs_next, NULL,
+						   mystreamer->base.bbs_buffer.data,
+						   mystreamer->base.bbs_buffer.maxlen,
+						   BBSTREAMER_UNKNOWN);
+
+	bbstreamer_finalize(mystreamer->base.bbs_next);
+}
+
+/*
+ * Free memory.
+ */
+static void
+bbstreamer_zstd_decompressor_free(bbstreamer *streamer)
+{
+	bbstreamer_zstd_frame *mystreamer = (bbstreamer_zstd_frame *) streamer;
+
+	bbstreamer_free(streamer->bbs_next);
+	ZSTD_freeDCtx(mystreamer->dctx);
+	pfree(streamer->bbs_buffer.data);
+	pfree(streamer);
+}
+#endif
diff --git a/src/bin/pg_basebackup/pg_basebackup.c b/src/bin/pg_basebackup/pg_basebackup.c
index 7ba752c1c9..c2cb04be1f 100644
--- a/src/bin/pg_basebackup/pg_basebackup.c
+++ b/src/bin/pg_basebackup/pg_basebackup.c
@@ -1332,6 +1332,8 @@ CreateBackupStreamer(char *archive_name, char *spclocation,
 			streamer = bbstreamer_gzip_decompressor_new(streamer);
 		else if (compressmethod == COMPRESSION_LZ4)
 			streamer = bbstreamer_lz4_decompressor_new(streamer);
+		else if (compressmethod == COMPRESSION_ZSTD)
+			streamer = bbstreamer_zstd_decompressor_new(streamer);
 	}
 
 	/* Return the results. */
diff --git a/src/bin/pg_verifybackup/t/009_extract.pl b/src/bin/pg_verifybackup/t/009_extract.pl
old mode 100644
new mode 100755
index c51cdf79f8..d30ba01742
--- a/src/bin/pg_verifybackup/t/009_extract.pl
+++ b/src/bin/pg_verifybackup/t/009_extract.pl
@@ -31,6 +31,11 @@ my @test_configuration = (
 		'compression_method' => 'lz4',
 		'backup_flags' => ['--compress', 'server-lz4:5'],
 		'enabled' => check_pg_config("#define HAVE_LIBLZ4 1")
+	},
+	{
+		'compression_method' => 'zstd',
+		'backup_flags' => ['--compress', 'server-zstd:5'],
+		'enabled' => check_pg_config("#define HAVE_LIBZSTD 1")
 	}
 );
 
-- 
2.25.1

