commit 79c2fee3856e9c2263658a59b6a27caca95011e0
Author: Nitin Motiani <nitinmotiani@google.com>
Date:   Sat Feb 15 08:05:25 2025 +0000

    Add pipe-command support in pg_restore
    
    * This is same as the pg_dump change. We add support
      for --pipe-command in directory archive format. This can be used
      to read from multiple streams and do pre-processing (decompression
      with a custom algorithm, filtering etc) before restore.
      Currently that is not possible because the pg_dump output of
      directory format can't just be piped.
    * Like pg_dump, here also either filename or --pipe-command can be
      set. If neither are set, the standard input is used as before.
    * This is only supported with compression none and archive format
      directory.
    * We reuse the inputFileSpec field for the pipe-command. And add
      a bool to specify if it is a pipe.
    * The changes made for pg_dump to handle the pipe case with popen
      and pclose also work here.
    * The logic of %f format specifier to read from the pg_dump output
      is the same too. Most of the code from the pg_dump commit works.
      We add similar logic to the function to read large objects.
    * The --pipe command works -l and -L option.

diff --git a/src/bin/pg_dump/compress_io.c b/src/bin/pg_dump/compress_io.c
index 334b3208783..b58708cf8b0 100644
--- a/src/bin/pg_dump/compress_io.c
+++ b/src/bin/pg_dump/compress_io.c
@@ -259,26 +259,32 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode,
 	Assert(strcmp(mode, PG_BINARY_R) == 0);
 
 	fname = pg_strdup(path);
-
-	if (hasSuffix(fname, ".gz"))
-		compression_spec.algorithm = PG_COMPRESSION_GZIP;
-	else if (hasSuffix(fname, ".lz4"))
-		compression_spec.algorithm = PG_COMPRESSION_LZ4;
-	else if (hasSuffix(fname, ".zst"))
-		compression_spec.algorithm = PG_COMPRESSION_ZSTD;
-	else
+	/*
+	   If the path is a pipe command, the compression algorithm
+	   is none.
+	*/
+	if (!path_is_pipe_command)
 	{
-		if (stat(path, &st) == 0)
-			compression_spec.algorithm = PG_COMPRESSION_NONE;
-		else if (check_compressed_file(path, &fname, "gz"))
+		if (hasSuffix(fname, ".gz"))
 			compression_spec.algorithm = PG_COMPRESSION_GZIP;
-		else if (check_compressed_file(path, &fname, "lz4"))
+		else if (hasSuffix(fname, ".lz4"))
 			compression_spec.algorithm = PG_COMPRESSION_LZ4;
-		else if (check_compressed_file(path, &fname, "zst"))
+		else if (hasSuffix(fname, ".zst"))
 			compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+		else
+		{
+			if (stat(path, &st) == 0)
+				compression_spec.algorithm = PG_COMPRESSION_NONE;
+			else if (check_compressed_file(path, &fname, "gz"))
+				compression_spec.algorithm = PG_COMPRESSION_GZIP;
+			else if (check_compressed_file(path, &fname, "lz4"))
+				compression_spec.algorithm = PG_COMPRESSION_LZ4;
+			else if (check_compressed_file(path, &fname, "zst"))
+				compression_spec.algorithm = PG_COMPRESSION_ZSTD;
+		}
 	}
 
-	CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);  /* TODO: try to make it work also with pipes */
+	CFH = InitCompressFileHandle(compression_spec, path_is_pipe_command);
 	if (!CFH->open_func(fname, -1, mode, CFH))
 	{
 		free_keep_errno(CFH);
diff --git a/src/bin/pg_dump/pg_backup_directory.c b/src/bin/pg_dump/pg_backup_directory.c
index 193e9825a16..936b4a194a5 100644
--- a/src/bin/pg_dump/pg_backup_directory.c
+++ b/src/bin/pg_dump/pg_backup_directory.c
@@ -471,7 +471,18 @@ _LoadLOs(ArchiveHandle *AH, TocEntry *te)
 					 tocfname, line);
 
 		StartRestoreLO(AH, oid, AH->public.ropt->dropSchema);
-		snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+		/* TODO: This logic for naming blob files is common betwen _LoadLOs an _StartLO.
+		 * Refactor in a helper function.
+		 */
+		if (AH->fSpecIsPipe)
+		{
+			pipe = replace_percent_placeholders(ctx->directory, "pipe-command", "f", lofname);
+			strcpy(path, pipe);
+		}
+		else
+		{
+			snprintf(path, MAXPGPATH, "%s/%s", ctx->directory, lofname);
+		}
 		_PrintFileData(AH, path);
 		EndRestoreLO(AH, oid);
 	}
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 9b757dac568..f9fe72bde6a 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -59,7 +59,7 @@ static void usage(const char *progname);
 static void read_restore_filters(const char *filename, RestoreOptions *opts);
 static bool file_exists_in_directory(const char *dir, const char *filename);
 static int	restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
-								 int numWorkers, bool append_data, int num);
+								 int numWorkers, bool append_data, int num, bool filespec_is_pipe);
 static int	read_one_statement(StringInfo inBuf, FILE *pfile);
 static int	restore_all_databases(PGconn *conn, const char *dumpdirpath,
 								  SimpleStringList db_exclude_patterns, RestoreOptions *opts, int numWorkers);
@@ -84,6 +84,7 @@ main(int argc, char **argv)
 	int			n_errors = 0;
 	bool		globals_only = false;
 	SimpleStringList db_exclude_patterns = {NULL, NULL};
+	bool 	   	filespec_is_pipe = false;
 	static int	disable_triggers = 0;
 	static int	enable_row_security = 0;
 	static int	if_exists = 0;
@@ -165,6 +166,7 @@ main(int argc, char **argv)
 		{"statistics-only", no_argument, &statistics_only, 1},
 		{"filter", required_argument, NULL, 4},
 		{"exclude-database", required_argument, NULL, 6},
+		{"pipe-command", required_argument, NULL, 7},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -346,6 +348,11 @@ main(int argc, char **argv)
 				simple_string_list_append(&db_exclude_patterns, optarg);
 				break;
 
+			case 7:			/* pipe-command */
+				inputFileSpec = pg_strdup(optarg);
+				filespec_is_pipe = true;
+				break;
+
 			default:
 				/* getopt_long already emitted a complaint */
 				pg_log_error_hint("Try \"%s --help\" for more information.", progname);
@@ -353,11 +360,23 @@ main(int argc, char **argv)
 		}
 	}
 
-	/* Get file name from command line */
+	/* Get file name from command line. Note that filename argument and pipe-command can't both be set. */
 	if (optind < argc)
+	{
+		if (filespec_is_pipe)
+		{
+			pg_log_error_hint("Only one of [filespec, --pipe-command] allowed");
+			exit_nicely(1);
+		}
 		inputFileSpec = argv[optind++];
-	else
+	}
+	/* Even if the file argument is not provided, if the pipe-command is specified, we need to use that
+	 * as the file arg and not fallback to stdio.
+	 */
+	else if (!filespec_is_pipe)
+	{
 		inputFileSpec = NULL;
+	}
 
 	/* Complain if any arguments remain */
 	if (optind < argc)
@@ -567,7 +586,7 @@ main(int argc, char **argv)
 		if (globals_only)
 			pg_fatal("option -g/--globals-only can be used only when restoring an archive created by pg_dumpall");
 
-		n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, 0);
+		n_errors = restore_one_database(inputFileSpec, opts, numWorkers, false, 0, filespec_is_pipe);
 	}
 
 	/* Done, print a summary of ignored errors during restore. */
@@ -589,12 +608,18 @@ main(int argc, char **argv)
  */
 static int
 restore_one_database(const char *inputFileSpec, RestoreOptions *opts,
-					 int numWorkers, bool append_data, int num)
+					 int numWorkers, bool append_data, int num, bool filespec_is_pipe)
 {
 	Archive    *AH;
 	int			n_errors;
 
-	AH = OpenArchive(inputFileSpec, opts->format, false); /*TODO: support pipes in restore */
+	if (filespec_is_pipe && opts->format != archDirectory)
+	{
+		pg_log_error_hint("Option --pipe-command is only supported with directory format.");
+		exit_nicely(1);
+	}
+
+	AH = OpenArchive(inputFileSpec, opts->format, filespec_is_pipe);
 
 	SetArchiveOptions(AH, NULL, opts);
 
@@ -1241,7 +1266,7 @@ restore_all_databases(PGconn *conn, const char *dumpdirpath,
 		opts->dumpStatistics = dumpStatistics;
 
 		/* Restore the single database. */
-		n_errors = restore_one_database(subdirpath, opts, numWorkers, true, count);
+		n_errors = restore_one_database(subdirpath, opts, numWorkers, true, count, false);
 
 		/* Print a summary of ignored errors during single database restore. */
 		if (n_errors)
