commit 6c64cd2f3259a2f04f9deff61b7ddfebe91ba1df
Author: Nitin Motiani <nitinmotiani@google.com>
Date:   Tue Feb 11 08:31:02 2025 +0000

    Add pipe-command support for directory mode of pg_dump
    
    * We add a new flag --pipe-command which can be used in directory
      mode. This allows us to support multiple streams and we can
      do post processing like compression, filtering etc. This is
      currently not possible with directory-archive format.
    * Currently this flag is only supported with compression none
      and archive format directory.
    * This flag can't be used with the flag --file. Only one of the
      two flags can be used at a time.
    * We reuse the filename field for the --pipe-command also. And add a
      bool to specify that the field will be used as a pipe command.
    * Most of the code remains as it is. The core change is that
      in case of --pipe-command, instead of fopen we do popen.
    * The user would need a way to store the post-processing output
      in files. For that we support the same format as the directory
      mode currently does with the flag --file. We allow the user
      to add a format specifier %f to the --pipe-command. And for each
      stream, the format specifier is replaced with the corresponding
      file name. This file name is the same as it would have been if
      the flag --file had been used.
    * To enable the above, there are a few places in the code where
      we change the file name creation logic. Currently the file name
      is appended to the directory name which is provided with --file flag.
      In case of --pipe-command, we instead replace %f with the file name.
      This change is made for the common use case and separately for
      blob files.
    * There is an open question on what mode to use in case of large objects
      TOC file. Currently the code uses "ab" but that won't work for popen.
      We have proposed a few options in the comments regarding this. For the
      time being we are using mode PG_BINARY_W for the pipe use case.

diff --git a/src/bin/pg_dump/compress_gzip.c b/src/bin/pg_dump/compress_gzip.c
index 23f617209e6..92127e87889 100644
--- a/src/bin/pg_dump/compress_gzip.c
+++ b/src/bin/pg_dump/compress_gzip.c
@@ -389,8 +389,12 @@ Gzip_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
 
 void
 InitCompressFileHandleGzip(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
+	if(path_is_pipe_command)
+		pg_fatal("cPipe command not supported for Gzip");
+
 	CFH->open_func = Gzip_open;
 	CFH->open_write_func = Gzip_open_write;
 	CFH->read_func = Gzip_read;
@@ -415,7 +419,8 @@ InitCompressorGzip(CompressorState *cs,
 
 void
 InitCompressFileHandleGzip(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
 	pg_fatal("this build does not support compression with %s", "gzip");
 }
diff --git a/src/bin/pg_dump/compress_gzip.h b/src/bin/pg_dump/compress_gzip.h
index 3bef0d5b1b8..ccf2479cf3b 100644
--- a/src/bin/pg_dump/compress_gzip.h
+++ b/src/bin/pg_dump/compress_gzip.h
@@ -19,6 +19,7 @@
 extern void InitCompressorGzip(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleGzip(CompressFileHandle *CFH,
-									   const pg_compress_specification compression_spec);
+									   const pg_compress_specification compression_spec,
+									   bool path_is_pipe_command);
 
 #endif							/* _COMPRESS_GZIP_H_ */
diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 8c3d9c911c4..334b3208783 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -191,20 +191,29 @@ free_keep_errno(void *p)
  * Initialize a compress file handle for the specified compression algorithm.
  */
 CompressFileHandle *
-InitCompressFileHandle(const pg_compress_specification compression_spec)
+InitCompressFileHandle(const pg_compress_specification compression_spec,
+                       bool path_is_pipe_command)
 {
 	CompressFileHandle *CFH;
 
+
 	CFH = pg_malloc0(sizeof(CompressFileHandle));
 
-	if (compression_spec.algorithm == PG_COMPRESSION_NONE)
-		InitCompressFileHandleNone(CFH, compression_spec);
+	/* TODO: Currently always set to non-compressed when path_is_pipe_command 
+	 * assuming that external compressor as part of pipe is nore efficient
+	 * should review after POC
+	 */
+	if (path_is_pipe_command)
+		InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
+
+	else if (compression_spec.algorithm == PG_COMPRESSION_NONE)
+		InitCompressFileHandleNone(CFH, compression_spec, path_is_pipe_command);
 	else if (compression_spec.algorithm == PG_COMPRESSION_GZIP)
-		InitCompressFileHandleGzip(CFH, compression_spec);
+		InitCompressFileHandleGzip(CFH, compression_spec, path_is_pipe_command);
 	else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
-		InitCompressFileHandleLZ4(CFH, compression_spec);
+		InitCompressFileHandleLZ4(CFH, compression_spec, path_is_pipe_command);
 	else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
-		InitCompressFileHandleZstd(CFH, compression_spec);
+		InitCompressFileHandleZstd(CFH, compression_spec, path_is_pipe_command);
 
 	return CFH;
 }
@@ -237,7 +246,8 @@ check_compressed_file(const char *path, char **fname, char *ext)
  * On failure, return NULL with an error code in errno.
  */
 CompressFileHandle *
-InitDiscoverCompressFileHandle(const char *path, const char *mode)
+InitDiscoverCompressFileHandle(const char *path, const char *mode,
+                               bool path_is_pipe_command)
 {
 	CompressFileHandle *CFH = NULL;
 	struct stat st;
@@ -268,7 +278,7 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 			compression_spec.algorithm = PG_COMPRESSION_ZSTD;
 	}
 
-	CFH = InitCompressFileHandle(compression_spec);
+	CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);  /* TODO: try to make it work also with pipes */
 	if (!CFH->open_func(fname, -1, mode, CFH))
 	{
 		free_keep_errno(CFH);
diff --git a/src/bin/pg_dump/compress_io.h b/src/bin/pg_dump/compress_io.h
index db9b38744c8..d3b795d7e94 100644
--- a/src/bin/pg_dump/compress_io.h
+++ b/src/bin/pg_dump/compress_io.h
@@ -185,6 +185,11 @@ struct CompressFileHandle
 	 */
 	pg_compress_specification compression_spec;
 
+	/*
+	 * Compression specification for this file handle.
+	 */
+	bool path_is_pipe_command;
+
 	/*
 	 * Private data to be used by the compressor.
 	 */
@@ -194,7 +199,8 @@ struct CompressFileHandle
 /*
  * Initialize a compress file handle with the requested compression.
  */
-extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec);
+extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specification compression_spec,
+                                                  bool path_is_pipe_command);
 
 /*
  * Initialize a compress file stream. Infer the compression algorithm
@@ -202,6 +208,7 @@ extern CompressFileHandle *InitCompressFileHandle(const pg_compress_specificatio
  * suffixes in 'path'.
  */
 extern CompressFileHandle *InitDiscoverCompressFileHandle(const char *path,
-														  const char *mode);
+														  const char *mode,
+														  bool path_is_pipe_command);
 extern bool EndCompressFileHandle(CompressFileHandle *CFH);
 #endif
diff --git a/src/bin/pg_dump/compress_lz4.c b/src/bin/pg_dump/compress_lz4.c
index e99f0cad71f..7e7713936bf 100644
--- a/src/bin/pg_dump/compress_lz4.c
+++ b/src/bin/pg_dump/compress_lz4.c
@@ -751,10 +751,14 @@ LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH
  */
 void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
-						  const pg_compress_specification compression_spec)
+						  const pg_compress_specification compression_spec,
+						  bool path_is_pipe_command)
 {
 	LZ4State   *state;
 
+	if(path_is_pipe_command)
+		pg_fatal("cPipe command not supported for LZ4");
+
 	CFH->open_func = LZ4Stream_open;
 	CFH->open_write_func = LZ4Stream_open_write;
 	CFH->read_func = LZ4Stream_read;
@@ -770,6 +774,8 @@ InitCompressFileHandleLZ4(CompressFileHandle *CFH,
 	if (CFH->compression_spec.level >= 0)
 		state->prefs.compressionLevel = CFH->compression_spec.level;
 
+	CFH->path_is_pipe_command = path_is_pipe_command;
+
 	CFH->private_data = state;
 }
 #else							/* USE_LZ4 */
@@ -782,7 +788,8 @@ InitCompressorLZ4(CompressorState *cs,
 
 void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
-						  const pg_compress_specification compression_spec)
+						  const pg_compress_specification compression_spec,
+						  bool path_is_pipe_command)
 {
 	pg_fatal("this build does not support compression with %s", "LZ4");
 }
diff --git a/src/bin/pg_dump/compress_lz4.h b/src/bin/pg_dump/compress_lz4.h
index 7f7216cc648..d52e6071519 100644
--- a/src/bin/pg_dump/compress_lz4.h
+++ b/src/bin/pg_dump/compress_lz4.h
@@ -19,6 +19,7 @@
 extern void InitCompressorLZ4(CompressorState *cs,
 							  const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleLZ4(CompressFileHandle *CFH,
-									  const pg_compress_specification compression_spec);
+									  const pg_compress_specification compression_spec,
+									  bool path_is_pipe_command);
 
 #endif							/* _COMPRESS_LZ4_H_ */
diff --git a/src/bin/pg_dump/compress_none.c b/src/bin/pg_dump/compress_none.c
index 3fc89c99854..6d69e580ed2 100644
--- a/src/bin/pg_dump/compress_none.c
+++ b/src/bin/pg_dump/compress_none.c
@@ -153,7 +153,12 @@ close_none(CompressFileHandle *CFH)
 	CFH->private_data = NULL;
 
 	if (fp)
-		ret = fclose(fp);
+	{
+		if(CFH->path_is_pipe_command)
+			ret = pclose(fp);
+		else
+			ret = fclose(fp);
+	}
 
 	return ret == 0;
 }
@@ -172,7 +177,12 @@ open_none(const char *path, int fd, const char *mode, CompressFileHandle *CFH)
 	if (fd >= 0)
 		CFH->private_data = fdopen(dup(fd), mode);
 	else
-		CFH->private_data = fopen(path, mode);
+	{
+		if (CFH->path_is_pipe_command)
+			CFH->private_data = popen(path, mode);
+		else
+			CFH->private_data = fopen(path, mode);
+	}
 
 	if (CFH->private_data == NULL)
 		return false;
@@ -185,7 +195,14 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
 {
 	Assert(CFH->private_data == NULL);
 
-	CFH->private_data = fopen(path, mode);
+	pg_log_debug("Opening %s, pipe is %s",
+				 path, CFH->path_is_pipe_command ? "true" : "false");
+
+    	if (CFH->path_is_pipe_command)
+		CFH->private_data = popen(path, mode);
+	else
+		CFH->private_data = fopen(path, mode);
+
 	if (CFH->private_data == NULL)
 		return false;
 
@@ -198,7 +215,8 @@ open_write_none(const char *path, const char *mode, CompressFileHandle *CFH)
 
 void
 InitCompressFileHandleNone(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
 	CFH->open_func = open_none;
 	CFH->open_write_func = open_write_none;
@@ -210,5 +228,7 @@ InitCompressFileHandleNone(CompressFileHandle *CFH,
 	CFH->eof_func = eof_none;
 	CFH->get_error_func = get_error_none;
 
+	CFH->path_is_pipe_command = path_is_pipe_command;
+
 	CFH->private_data = NULL;
 }
diff --git a/src/bin/pg_dump/compress_none.h b/src/bin/pg_dump/compress_none.h
index f927f196c36..1399c8bde3b 100644
--- a/src/bin/pg_dump/compress_none.h
+++ b/src/bin/pg_dump/compress_none.h
@@ -19,6 +19,7 @@
 extern void InitCompressorNone(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleNone(CompressFileHandle *CFH,
-									   const pg_compress_specification compression_spec);
+									   const pg_compress_specification compression_spec,
+									   bool path_is_pipe_command);
 
 #endif							/* _COMPRESS_NONE_H_ */
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
index 1f7b4942706..f38c085d37a 100644
--- a/src/bin/pg_dump/compress_zstd.c
+++ b/src/bin/pg_dump/compress_zstd.c
@@ -26,7 +26,7 @@ InitCompressorZstd(CompressorState *cs, const pg_compress_specification compress
 }
 
 void
-InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec, bool path_is_pipe_command)
 {
 	pg_fatal("this build does not support compression with %s", "ZSTD");
 }
@@ -523,8 +523,12 @@ Zstd_get_error(CompressFileHandle *CFH)
 
 void
 InitCompressFileHandleZstd(CompressFileHandle *CFH,
-						   const pg_compress_specification compression_spec)
+						   const pg_compress_specification compression_spec,
+						   bool path_is_pipe_command)
 {
+	if(path_is_pipe_command)
+		pg_fatal("cPipe command not supported for Zstd");
+
 	CFH->open_func = Zstd_open;
 	CFH->open_write_func = Zstd_open_write;
 	CFH->read_func = Zstd_read;
@@ -536,6 +540,7 @@ InitCompressFileHandleZstd(CompressFileHandle *CFH,
 	CFH->get_error_func = Zstd_get_error;
 
 	CFH->compression_spec = compression_spec;
+	CFH->path_is_pipe_command = path_is_pipe_command;
 
 	CFH->private_data = NULL;
 }
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
index af21db48ded..1644b6d6eba 100644
--- a/src/bin/pg_dump/compress_zstd.h
+++ b/src/bin/pg_dump/compress_zstd.h
@@ -20,6 +20,7 @@
 extern void InitCompressorZstd(CompressorState *cs,
 							   const pg_compress_specification compression_spec);
 extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
-									   const pg_compress_specification compression_spec);
+									   const pg_compress_specification compression_spec,
+									   bool path_is_pipe_command);
 
 #endif							/* COMPRESS_ZSTD_H */
diff --git a/src/bin/pg_dump/pg_backup.h b/src/bin/pg_dump/pg_backup.h
index af0007fb6d2..18be674ab7f 100644
--- a/src/bin/pg_dump/pg_backup.h
+++ b/src/bin/pg_dump/pg_backup.h
@@ -311,14 +311,15 @@ extern void ProcessArchiveRestoreOptions(Archive *AHX);
 extern void RestoreArchive(Archive *AHX, bool append_data);
 
 /* Open an existing archive */
-extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt);
+extern Archive *OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe);
 
 /* Create a new archive */
 extern Archive *CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 							  const pg_compress_specification compression_spec,
 							  bool dosync, ArchiveMode mode,
 							  SetupWorkerPtrType setupDumpWorker,
-							  DataDirSyncMethod sync_method);
+							  DataDirSyncMethod sync_method,
+							  bool FileSpecIsPipe); /* ^^ what are the casing rules here ?? */
 
 /* The --list option */
 extern void PrintTOCSummary(Archive *AHX);
diff --git a/src/bin/pg_dump/pg_backup_archiver.c b/src/bin/pg_dump/pg_backup_archiver.c
index f961162f365..83e66873a5d 100644
--- a/src/bin/pg_dump/pg_backup_archiver.c
+++ b/src/bin/pg_dump/pg_backup_archiver.c
@@ -54,7 +54,7 @@ static ArchiveHandle *_allocAH(const char *FileSpec, const ArchiveFormat fmt,
 							   const pg_compress_specification compression_spec,
 							   bool dosync, ArchiveMode mode,
 							   SetupWorkerPtrType setupWorkerPtr,
-							   DataDirSyncMethod sync_method);
+							   DataDirSyncMethod sync_method, bool FileSpecIsPipe);
 static void _getObjectDescription(PQExpBuffer buf, const TocEntry *te);
 static void _printTocEntry(ArchiveHandle *AH, TocEntry *te, const char *pfx);
 static char *sanitize_line(const char *str, bool want_hyphen);
@@ -230,11 +230,12 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 			  const pg_compress_specification compression_spec,
 			  bool dosync, ArchiveMode mode,
 			  SetupWorkerPtrType setupDumpWorker,
-			  DataDirSyncMethod sync_method)
+			  DataDirSyncMethod sync_method,
+			  bool FileSpecIsPipe)
 
 {
 	ArchiveHandle *AH = _allocAH(FileSpec, fmt, compression_spec,
-								 dosync, mode, setupDumpWorker, sync_method);
+								 dosync, mode, setupDumpWorker, sync_method, FileSpecIsPipe);
 
 	return (Archive *) AH;
 }
@@ -242,7 +243,7 @@ CreateArchive(const char *FileSpec, const ArchiveFormat fmt,
 /* Open an existing archive */
 /* Public */
 Archive *
-OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
+OpenArchive(const char *FileSpec, const ArchiveFormat fmt, bool FileSpecIsPipe)
 {
 	ArchiveHandle *AH;
 	pg_compress_specification compression_spec = {0};
@@ -250,7 +251,7 @@ OpenArchive(const char *FileSpec, const ArchiveFormat fmt)
 	compression_spec.algorithm = PG_COMPRESSION_NONE;
 	AH = _allocAH(FileSpec, fmt, compression_spec, true,
 				  archModeRead, setupRestoreWorker,
-				  DATA_DIR_SYNC_METHOD_FSYNC);
+				  DATA_DIR_SYNC_METHOD_FSYNC, FileSpecIsPipe);
 
 	return (Archive *) AH;
 }
@@ -1705,7 +1706,7 @@ SetOutput(ArchiveHandle *AH, const char *filename,
 	else
 		mode = PG_BINARY_W;
 
-	CFH = InitCompressFileHandle(compression_spec);
+	CFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
 
 	if (!CFH->open_func(filename, fn, mode, CFH))
 	{
@@ -2362,7 +2363,8 @@ static ArchiveHandle *
 _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 		 const pg_compress_specification compression_spec,
 		 bool dosync, ArchiveMode mode,
-		 SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method)
+		 SetupWorkerPtrType setupWorkerPtr, DataDirSyncMethod sync_method,
+		 bool FileSpecIsPipe)
 {
 	ArchiveHandle *AH;
 	CompressFileHandle *CFH;
@@ -2403,6 +2405,8 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 	else
 		AH->fSpec = NULL;
 
+	AH->fSpecIsPipe = FileSpecIsPipe;
+
 	AH->currUser = NULL;		/* unknown */
 	AH->currSchema = NULL;		/* ditto */
 	AH->currTablespace = NULL;	/* ditto */
@@ -2415,14 +2419,14 @@ _allocAH(const char *FileSpec, const ArchiveFormat fmt,
 
 	AH->mode = mode;
 	AH->compression_spec = compression_spec;
-	AH->dosync = dosync;
+	AH->dosync = FileSpecIsPipe ? false : dosync;
 	AH->sync_method = sync_method;
 
 	memset(&(AH->sqlparse), 0, sizeof(AH->sqlparse));
 
 	/* Open stdout with no compression for AH output handle */
 	out_compress_spec.algorithm = PG_COMPRESSION_NONE;
-	CFH = InitCompressFileHandle(out_compress_spec);
+	CFH = InitCompressFileHandle(out_compress_spec, AH->fSpecIsPipe);
 	if (!CFH->open_func(NULL, fileno(stdout), PG_BINARY_A, CFH))
 		pg_fatal("could not open stdout for appending: %m");
 	AH->OF = CFH;
diff --git a/src/bin/pg_dump/pg_backup_archiver.h b/src/bin/pg_dump/pg_backup_archiver.h
index 365073b3eae..d7fa3086184 100644
--- a/src/bin/pg_dump/pg_backup_archiver.h
+++ b/src/bin/pg_dump/pg_backup_archiver.h
@@ -301,6 +301,7 @@ struct _archiveHandle
 	int			loCount;		/* # of LOs restored */
 
 	char	   *fSpec;			/* Archive File Spec */
+	bool        fSpecIsPipe;     /* fSpec is a pipe command template requiring replacing %f with file name */
 	FILE	   *FH;				/* General purpose file handle */
 	void	   *OF;				/* Output file */
 
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index b2a841bb0ff..193e9825a16 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -39,7 +39,8 @@
 #include <dirent.h>
 #include <sys/stat.h>
 
-#include "common/file_utils.h"
+//#include "common/file_utils.h"
+#include "common/percentrepl.h"
 #include "compress_io.h"
 #include "parallel.h"
 #include "pg_backup_utils.h"
@@ -158,39 +159,41 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 	{
 		struct stat st;
 		bool		is_empty = false;
-
-		/* we accept an empty existing directory */
-		if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
+        
+		if(!AH->fSpecIsPipe) /* no checks for pipe */
 		{
-			DIR		   *dir = opendir(ctx->directory);
-
-			if (dir)
+			/* we accept an empty existing directory */
+			if (stat(ctx->directory, &st) == 0 && S_ISDIR(st.st_mode))
 			{
-				struct dirent *d;
+				DIR		   *dir = opendir(ctx->directory);
 
-				is_empty = true;
-				while (errno = 0, (d = readdir(dir)))
+				if (dir)
 				{
-					if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
+					struct dirent *d;
+
+					is_empty = true;
+					while (errno = 0, (d = readdir(dir)))
 					{
-						is_empty = false;
-						break;
+						if (strcmp(d->d_name, ".") != 0 && strcmp(d->d_name, "..") != 0)
+						{
+							is_empty = false;
+							break;
+						}
 					}
-				}
 
-				if (errno)
-					pg_fatal("could not read directory \"%s\": %m",
-							 ctx->directory);
+					if (errno)
+						pg_fatal("could not read directory \"%s\": %m",
+								ctx->directory);
 
-				if (closedir(dir))
-					pg_fatal("could not close directory \"%s\": %m",
-							 ctx->directory);
+					if (closedir(dir))
+						pg_fatal("could not close directory \"%s\": %m",
+								ctx->directory);
+				}
 			}
+			if (!is_empty && mkdir(ctx->directory, 0700) < 0)
+				pg_fatal("could not create directory \"%s\": %m",
+						ctx->directory);
 		}
-
-		if (!is_empty && mkdir(ctx->directory, 0700) < 0)
-			pg_fatal("could not create directory \"%s\": %m",
-					 ctx->directory);
 	}
 	else
 	{							/* Read Mode */
@@ -199,7 +202,7 @@ InitArchiveFmt_Directory(ArchiveHandle *AH)
 
 		setFilePath(AH, fname, "toc.dat");
 
-		tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R);
+		tocFH = InitDiscoverCompressFileHandle(fname, PG_BINARY_R, AH->fSpecIsPipe);
 		if (tocFH == NULL)
 			pg_fatal("could not open input file \"%s\": %m", fname);
 
@@ -327,7 +330,7 @@ _StartData(ArchiveHandle *AH, TocEntry *te)
 
 	setFilePath(AH, fname, tctx->filename);
 
-	ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
 
 	if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
@@ -391,7 +394,7 @@ _PrintFileData(ArchiveHandle *AH, char *filename)
 	if (!filename)
 		return;
 
-	CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R);
+	CFH = InitDiscoverCompressFileHandle(filename, PG_BINARY_R, AH->fSpecIsPipe);
 	if (!CFH)
 		pg_fatal("could not open input file \"%s\": %m", filename);
 
@@ -449,7 +452,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 	 */
 	setFilePath(AH, tocfname, tctx->filename);
 
-	CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R);
+	CFH = ctx->LOsTocFH = InitDiscoverCompressFileHandle(tocfname, PG_BINARY_R, AH->fSpecIsPipe);
 
 	if (ctx->LOsTocFH == NULL)
 		pg_fatal("could not open large object TOC file \"%s\" for input: %m",
@@ -460,6 +463,7 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 	{
 		char		lofname[MAXPGPATH + 1];
 		char		path[MAXPGPATH];
+		char* pipe;
 
 		/* Can't overflow because line and lofname are the same length */
 		if (sscanf(line, "%u %" CppAsString2(MAXPGPATH) "s\n", &oid, lofname) != 2)
@@ -595,7 +599,7 @@ _CloseArchive(ArchiveHandle *AH)
 
 		/* The TOC is always created uncompressed */
 		compression_spec.algorithm = PG_COMPRESSION_NONE;
-		tocFH = InitCompressFileHandle(compression_spec);
+		tocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
 		if (!tocFH->open_write_func(fname, PG_BINARY_W, tocFH))
 			pg_fatal("could not open output file \"%s\": %m", fname);
 		ctx->dataFH = tocFH;
@@ -656,13 +660,42 @@ _StartLOs(ArchiveHandle *AH, TocEntry *te)
 	lclTocEntry *tctx = (lclTocEntry *) te->formatData;
 	pg_compress_specification compression_spec = {0};
 	char		fname[MAXPGPATH];
+	const char *mode;
 
 	setFilePath(AH, fname, tctx->filename);
 
 	/* The LO TOC file is never compressed */
 	compression_spec.algorithm = PG_COMPRESSION_NONE;
-	ctx->LOsTocFH = InitCompressFileHandle(compression_spec);
-	if (!ctx->LOsTocFH->open_write_func(fname, "ab", ctx->LOsTocFH))
+	ctx->LOsTocFH = InitCompressFileHandle(compression_spec, AH->fSpecIsPipe);
+	/* TODO: Finalize the correct approach for the mode.
+	 * The mode for the LOs TOC file has been "ab" from the start. That
+	 * is something we can't do for pipe-command as popen only supports
+	 * read and write. Just changing it to 'w' was not expected to be enough
+	 * and one possible solution considered is to open it in 'w' mode and
+	 * keep it open till all the LOs in the dump group are done.
+	 * 
+	 * The analysis of the current code shows that there is one ToCEntry
+	 * per blob group. And it is written by @WriteDataChunksForToCEntry.
+	 * This function calls _StartLOs once before the dumper function and
+	 * and _EndLOs once after the dumper. And the dumper dumps all the
+	 * LOs in the group. So a blob_NNN.toc is only opened once and closed
+	 * after all the entries are written. Therefore the mode can be made 'w'
+	 * for all the cases. We tested changing the mode to PG_BINARY_W and
+	 * the tests passed. But in case there are some missing scenarios, we
+	 * have not made that change here. Instead for now only doing it for the
+         * pipe command.
+	 * In short there are 3 solutions :
+	 * 1. Change the mode for everything (preferred)
+	 * 2. Change it only for pipe-command (done for time-being)
+	 * 3. Change it for pipe-command and then cache those handles and
+         *    close them in the end (based on the code review, we might
+	 *    pick this).
+	 */
+	if (AH->fSpecIsPipe)
+		mode = PG_BINARY_W;
+	else
+		mode = "ab";
+	if (!ctx->LOsTocFH->open_write_func(fname, mode, ctx->LOsTocFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
 }
 
@@ -676,10 +709,22 @@ _StartLO(ArchiveHandle *AH, TocEntry *te, Oid oid)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
 	char		fname[MAXPGPATH];
+	char* pipe;
+	char blob_name[MAXPGPATH];
 
-	snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+	if(AH->fSpecIsPipe) 
+	{
+		snprintf(blob_name, MAXPGPATH, "blob_%u.dat", oid);
+		pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", blob_name);
+		strcpy(fname, pipe);
+		/* TODO:figure out how to free the allocated string when replace_percent_placeholders isused in frontend*/
+	}
+	 else
+	{
+		snprintf(fname, MAXPGPATH, "%s/blob_%u.dat", ctx->directory, oid);
+	}
 
-	ctx->dataFH = InitCompressFileHandle(AH->compression_spec);
+	ctx->dataFH = InitCompressFileHandle(AH->compression_spec, AH->fSpecIsPipe);
 	if (!ctx->dataFH->open_write_func(fname, PG_BINARY_W, ctx->dataFH))
 		pg_fatal("could not open output file \"%s\": %m", fname);
 }
@@ -740,15 +785,26 @@ setFilePath(ArchiveHandle *AH, char *buf, const char *relativeFilename)
 {
 	lclContext *ctx = (lclContext *) AH->formatData;
 	char	   *dname;
+	char	   *pipe;
 
 	dname = ctx->directory;
 
-	if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
-		pg_fatal("file name too long: \"%s\"", dname);
 
-	strcpy(buf, dname);
-	strcat(buf, "/");
-	strcat(buf, relativeFilename);
+	if(AH->fSpecIsPipe) 
+	{
+		pipe = replace_percent_placeholders(dname, "pipe-command", "f", relativeFilename);
+		strcpy(buf, pipe);
+		/* TODO:figure out how to free the allocated string when replace_percent_placeholders isused in frontend*/
+	}
+	else /* replace all ocurrences of %f in dname with relativeFilename */
+	{
+		if (strlen(dname) + 1 + strlen(relativeFilename) + 1 > MAXPGPATH)
+			pg_fatal("file name too long: \"%s\"", dname);
+
+		strcpy(buf, dname);
+		strcat(buf, "/");
+		strcat(buf, relativeFilename);
+	}
 }
 
 /*
@@ -790,17 +846,24 @@ _PrepParallelRestore(ArchiveHandle *AH)
 		 * only need an approximate indicator of that.
 		 */
 		setFilePath(AH, fname, tctx->filename);
+		pg_log_error("filename: %s", fname);
 
 		if (stat(fname, &st) == 0)
 			te->dataLength = st.st_size;
 		else if (AH->compression_spec.algorithm != PG_COMPRESSION_NONE)
 		{
+			if(AH->fSpecIsPipe)
+				pg_log_error("pipe and compressed");
+				
 			if (AH->compression_spec.algorithm == PG_COMPRESSION_GZIP)
 				strlcat(fname, ".gz", sizeof(fname));
 			else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
 				strlcat(fname, ".lz4", sizeof(fname));
-			else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+			else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD){
+				pg_log_error("filename: %s", fname);
 				strlcat(fname, ".zst", sizeof(fname));
+				pg_log_error("filename: %s", fname);
+			}
 
 			if (stat(fname, &st) == 0)
 				te->dataLength = st.st_size;
diff --git a/src/bin/pg_dump/pg_dump.c b/src/bin/pg_dump/pg_dump.c
index 8e6364d32d7..ce2e5eeccd8 100644
--- a/src/bin/pg_dump/pg_dump.c
+++ b/src/bin/pg_dump/pg_dump.c
@@ -409,6 +409,7 @@ main(int argc, char **argv)
 {
 	int			c;
 	const char *filename = NULL;
+	bool        filename_is_pipe = false;
 	const char *format = "p";
 	TableInfo  *tblinfo;
 	int			numTables;
@@ -528,6 +529,7 @@ main(int argc, char **argv)
 		{"filter", required_argument, NULL, 16},
 		{"exclude-extension", required_argument, NULL, 17},
 		{"sequence-data", no_argument, &dopt.sequence_data, 1},
+		{"pipe-command", required_argument, NULL, 25},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -599,7 +601,12 @@ main(int argc, char **argv)
 				break;
 
 			case 'f':
+                if(filename != NULL){
+					pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+					exit_nicely(1);
+				}
 				filename = pg_strdup(optarg);
+				filename_is_pipe = false; /* it already is, setting again here just for clarity */
 				break;
 
 			case 'F':
@@ -796,6 +803,15 @@ main(int argc, char **argv)
 				with_statistics = true;
 				break;
 
+			case 25:			/* pipe command */
+                if(filename != NULL){
+					pg_log_error_hint("Only one of [--file, --pipe-command] allowed");
+					exit_nicely(1);
+				}
+				filename = pg_strdup(optarg);
+				filename_is_pipe = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -888,13 +904,26 @@ main(int argc, char **argv)
 	if (archiveFormat == archNull)
 		plainText = 1;
 
+	if (filename_is_pipe && archiveFormat != archDirectory)
+	{
+		pg_log_error_hint("Option --pipe-command is only supported with directory format.");
+		exit_nicely(1);
+	}
+
+	if (filename_is_pipe && strcmp(compression_algorithm_str, "none") != 0)
+	{
+		pg_log_error_hint("Option --pipe-command is not supported with any compression type.");
+		exit_nicely(1);
+	}
+
 	/*
 	 * Custom and directory formats are compressed by default with gzip when
 	 * available, not the others.  If gzip is not available, no compression is
-	 * done by default.
+	 * done by default. If directory format is being used with pipe-command,
+	 * no compression is done.
 	 */
 	if ((archiveFormat == archCustom || archiveFormat == archDirectory) &&
-		!user_compression_defined)
+		!filename_is_pipe && !user_compression_defined)
 	{
 #ifdef HAVE_LIBZ
 		compression_algorithm_str = "gzip";
@@ -944,7 +973,7 @@ main(int argc, char **argv)
 
 	/* Open the output file */
 	fout = CreateArchive(filename, archiveFormat, compression_spec,
-						 dosync, archiveMode, setupDumpWorker, sync_method);
+						 dosync, archiveMode, setupDumpWorker, sync_method, filename_is_pipe);
 
 	/* Make dump options accessible right away */
 	SetArchiveOptions(fout, &dopt, NULL);
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index fe33b283a01..9b757dac568 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -594,7 +594,7 @@ restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
 	Archive    *AH;
 	int			n_errors;
 
-	AH = OpenArchive(inputFileSpec, opts->format);
+	AH = OpenArchive(inputFileSpec, opts->format, false); /*TODO: support pipes in restore */
 
 	SetArchiveOptions(AH, NULL, opts);
 
