From 8de74fda7825301e3f10b10ce132751386ea5fb7 Mon Sep 17 00:00:00 2001
From: Mahendra Singh Thalor <mahi6run@gmail.com>
Date: Wed, 1 Jan 2025 12:19:08 -0800
Subject: [PATCH] pg_dumpall with directory format and restore it by pg_restore

new option to pg_dumpall:
-F, --format=d|p|directory|plain output file format (directory, plain text (default))

Ex: ./pg_dumpall --format=directory --file=dumpDirName

dumps are as:
global.dat ::: global sql commands in simple plain format
map.dat.   ::: dboid dbname ---entries for all databases in simple text form
databases. :::
      subdir     dboid1  -> toc.dat and data files in archive format
      subdir     dboid2. -> toc.dat and data files in archive format
              etc
---------------------------------------------------------------------------
NOTE:
if needed, restore single db by particular subdir

Ex: ./pg_restore --format=directory -d postgres dumpDirName/databases/5
   -- here, 5 is the dboid of postgres db
   -- to get dboid, refer dbname in map.file

--------------------------------------------------------------------------
new options to pg_restore:
-g, --globals-only           restore only global objects, no databases
--exclude-database=NAME   exclude database whose name matches name

When we give -g/--globals-only option, then only restore globals, no db restoring.

Design:
When --format=directory is specified and there is no toc.dat in main directory, then check
for global.dat and map.dat to restore all databases. If both files are exists in directory,
then first restore all globals from global.dat and then restore all databases one by one
from map.dat list.

TODO: We can restore databases in parallel mode.
---
 doc/src/sgml/ref/pg_dumpall.sgml |  30 ++
 doc/src/sgml/ref/pg_restore.sgml |  30 ++
 src/bin/pg_dump/pg_dumpall.c     | 138 ++++++--
 src/bin/pg_dump/pg_restore.c     | 661 ++++++++++++++++++++++++++++++++++++++-
 4 files changed, 829 insertions(+), 30 deletions(-)

diff --git a/doc/src/sgml/ref/pg_dumpall.sgml b/doc/src/sgml/ref/pg_dumpall.sgml
index 014f279..b6c9feb 100644
--- a/doc/src/sgml/ref/pg_dumpall.sgml
+++ b/doc/src/sgml/ref/pg_dumpall.sgml
@@ -582,6 +582,36 @@ exclude database <replaceable class="parameter">PATTERN</replaceable>
       </listitem>
      </varlistentry>
 
+    <varlistentry>
+      <term><option>-F <replaceable class="parameter">format</replaceable></option></term>
+      <term><option>--format=<replaceable class="parameter">format</replaceable></option></term>
+      <listitem>
+       <para>
+        Specify format of dump files.  If we want to dump all the databases, then pass this as directory so that dump of all databases can be taken in separate subdirectory in archive format.
+by default, this is plain.
+
+       <variablelist>
+        <varlistentry>
+         <term><literal>d</literal></term>
+         <term><literal>directory</literal></term>
+         <listitem>
+          <para>
+           The archive is a directory archive.
+          </para>
+         </listitem>
+        </varlistentry>
+
+       <variablelist>
+        <varlistentry>
+         <term><literal>p</literal></term>
+         <term><literal>plain</literal></term>
+         <listitem>
+          <para>
+           The archive is a plain archive.(by default also)
+          </para>
+         </listitem>
+        </varlistentry>
+
      <varlistentry>
        <term><option>-?</option></term>
        <term><option>--help</option></term>
diff --git a/doc/src/sgml/ref/pg_restore.sgml b/doc/src/sgml/ref/pg_restore.sgml
index b8b27e1..ab2e035 100644
--- a/doc/src/sgml/ref/pg_restore.sgml
+++ b/doc/src/sgml/ref/pg_restore.sgml
@@ -316,6 +316,16 @@ PostgreSQL documentation
      </varlistentry>
 
      <varlistentry>
+      <term><option>-g</option></term>
+      <term><option>--globals-only</option></term>
+      <listitem>
+       <para>
+        Restore only global objects (roles and tablespaces), no databases.
+       </para>
+      </listitem>
+     </varlistentry>
+
+     <varlistentry>
       <term><option>-I <replaceable class="parameter">index</replaceable></option></term>
       <term><option>--index=<replaceable class="parameter">index</replaceable></option></term>
       <listitem>
@@ -932,6 +942,26 @@ PostgreSQL documentation
       </listitem>
      </varlistentry>
 
+     <varlistentry>
+      <term><option>--exclude-database=<replaceable class="parameter">pattern</replaceable></option></term>
+      <listitem>
+       <para>
+        Do not restore databases whose name matches
+        <replaceable class="parameter">pattern</replaceable>.
+        Multiple patterns can be excluded by writing multiple
+        <option>--exclude-database</option> switches.  The
+        <replaceable class="parameter">pattern</replaceable> parameter is
+        interpreted as a pattern according to the same rules used by
+        <application>psql</application>'s <literal>\d</literal>
+        commands (see <xref linkend="app-psql-patterns"/>),
+        so multiple databases can also be excluded by writing wildcard
+        characters in the pattern.  When using wildcards, be careful to
+        quote the pattern if needed to prevent shell wildcard expansion.
+       </para>
+      </listitem>
+     </varlistentry>
+
+
     </variablelist>
    </para>
  </refsect1>
diff --git a/src/bin/pg_dump/pg_dumpall.c b/src/bin/pg_dump/pg_dumpall.c
index 396f797..066197b 100644
--- a/src/bin/pg_dump/pg_dumpall.c
+++ b/src/bin/pg_dump/pg_dumpall.c
@@ -15,6 +15,7 @@
 
 #include "postgres_fe.h"
 
+#include <sys/stat.h>
 #include <time.h>
 #include <unistd.h>
 
@@ -29,6 +30,7 @@
 #include "filter.h"
 #include "getopt_long.h"
 #include "pg_backup.h"
+#include "pg_backup_archiver.h"
 
 /* version string we expect back from pg_dump */
 #define PGDUMP_VERSIONSTR "pg_dump (PostgreSQL) " PG_VERSION "\n"
@@ -64,9 +66,10 @@ static void dropTablespaces(PGconn *conn);
 static void dumpTablespaces(PGconn *conn);
 static void dropDBs(PGconn *conn);
 static void dumpUserConfig(PGconn *conn, const char *username);
-static void dumpDatabases(PGconn *conn);
+static void dumpDatabases(PGconn *conn, bool directory_format);
 static void dumpTimestamp(const char *msg);
-static int	runPgDump(const char *dbname, const char *create_opts);
+static int runPgDump(const char *dbname, const char *create_opts,
+					 char *dbfile);
 static void buildShSecLabels(PGconn *conn,
 							 const char *catalog_name, Oid objectId,
 							 const char *objtype, const char *objname,
@@ -147,6 +150,7 @@ main(int argc, char *argv[])
 		{"password", no_argument, NULL, 'W'},
 		{"no-privileges", no_argument, NULL, 'x'},
 		{"no-acl", no_argument, NULL, 'x'},
+		{"format", required_argument, NULL, 'F'},
 
 		/*
 		 * the following options don't have an equivalent short option letter
@@ -188,11 +192,13 @@ main(int argc, char *argv[])
 	char	   *pgdb = NULL;
 	char	   *use_role = NULL;
 	const char *dumpencoding = NULL;
+	const char *format;
 	trivalue	prompt_password = TRI_DEFAULT;
 	bool		data_only = false;
 	bool		globals_only = false;
 	bool		roles_only = false;
 	bool		tablespaces_only = false;
+	bool		directory_format = false;
 	PGconn	   *conn;
 	int			encoding;
 	const char *std_strings;
@@ -237,7 +243,7 @@ main(int argc, char *argv[])
 
 	pgdumpopts = createPQExpBuffer();
 
-	while ((c = getopt_long(argc, argv, "acd:E:f:gh:l:Op:rsS:tU:vwWx", long_options, &optindex)) != -1)
+	while ((c = getopt_long(argc, argv, "acd:E:f:F:gh:l:Op:rsS:tU:vwWx", long_options, &optindex)) != -1)
 	{
 		switch (c)
 		{
@@ -265,7 +271,17 @@ main(int argc, char *argv[])
 				appendPQExpBufferStr(pgdumpopts, " -f ");
 				appendShellString(pgdumpopts, filename);
 				break;
-
+			case 'F':
+				format = optarg;
+				if ((strcmp(format, "directory") == 0 || strcmp(format, "d") == 0))
+					directory_format = true;
+				else if (strcmp(format, "plain") != 0 || strcmp(format, "p") == 0)
+				{
+					pg_log_error("invalid format specified: %s", format);
+					pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+					exit_nicely(1);
+				}
+				break;
 			case 'g':
 				globals_only = true;
 				break;
@@ -497,9 +513,31 @@ main(int argc, char *argv[])
 						   &database_exclude_names);
 
 	/*
-	 * Open the output file if required, otherwise use stdout
+	 * Open the output file if required, otherwise use stdout.
 	 */
-	if (filename)
+	if (directory_format)
+	{
+		char	toc_path[MAXPGPATH];
+
+		/*
+		 * If directory format is specified then we must provide the directory
+		 * name.
+		 */
+		if (!filename || strcmp(filename, "") == 0)
+			pg_fatal("no output directory specified");
+
+		/* TODO: accept the empty existing directory. */
+		if (mkdir(filename, 0700) < 0)
+			pg_fatal("could not create directory \"%s\": %m",
+					 filename);
+
+		snprintf(toc_path, MAXPGPATH, "%s/global.dat", filename);
+
+		OPF = fopen(toc_path, "w");
+		if (!OPF)
+			pg_fatal("could not open global.dat file: %s", strerror(errno));
+	}
+	else if (filename)
 	{
 		OPF = fopen(filename, PG_BINARY_W);
 		if (!OPF)
@@ -607,7 +645,7 @@ main(int argc, char *argv[])
 	}
 
 	if (!globals_only && !roles_only && !tablespaces_only)
-		dumpDatabases(conn);
+		dumpDatabases(conn, directory_format);
 
 	PQfinish(conn);
 
@@ -620,7 +658,7 @@ main(int argc, char *argv[])
 		fclose(OPF);
 
 		/* sync the resulting file, errors are not fatal */
-		if (dosync)
+		if (dosync && !directory_format)
 			(void) fsync_fname(filename, false);
 	}
 
@@ -637,6 +675,7 @@ help(void)
 
 	printf(_("\nGeneral options:\n"));
 	printf(_("  -f, --file=FILENAME          output file name\n"));
+	printf(_("  -F, --format=d|p             output file format (directory, plain text (default))\n"));
 	printf(_("  -v, --verbose                verbose mode\n"));
 	printf(_("  -V, --version                output version information, then exit\n"));
 	printf(_("  --lock-wait-timeout=TIMEOUT  fail after waiting TIMEOUT for a table lock\n"));
@@ -1487,10 +1526,13 @@ expand_dbname_patterns(PGconn *conn,
  * Dump contents of databases.
  */
 static void
-dumpDatabases(PGconn *conn)
+dumpDatabases(PGconn *conn, bool directory_format)
 {
 	PGresult   *res;
 	int			i;
+	char		db_subdir[MAXPGPATH];
+	char		dbfilepath[MAXPGPATH];
+	FILE       *map_file;
 
 	/*
 	 * Skip databases marked not datallowconn, since we'd be unable to connect
@@ -1504,7 +1546,7 @@ dumpDatabases(PGconn *conn)
 	 * doesn't have some failure mode with --clean.
 	 */
 	res = executeQuery(conn,
-					   "SELECT datname "
+					   "SELECT datname, oid "
 					   "FROM pg_database d "
 					   "WHERE datallowconn AND datconnlimit != -2 "
 					   "ORDER BY (datname <> 'template1'), datname");
@@ -1512,9 +1554,30 @@ dumpDatabases(PGconn *conn)
 	if (PQntuples(res) > 0)
 		fprintf(OPF, "--\n-- Databases\n--\n\n");
 
+	/*
+	 * If directory format is specified then create a subdirectory under the
+	 * main directory and each database dump file will be created under the
+	 * subdirectory in archive mode as per single db pg_dump.
+	 */
+	if (directory_format)
+	{
+		char    map_file_path[MAXPGPATH];
+
+		snprintf(db_subdir, MAXPGPATH, "%s/databases", filename);
+		if (mkdir(db_subdir, 0755) != 0)
+			pg_log_error("could not create subdirectory \"%s\": %m", db_subdir);
+
+		/* Create a map file (to store dboid and dbname) */
+		snprintf(map_file_path, MAXPGPATH, "%s/map.dat", filename);
+		map_file = fopen(map_file_path, "w");
+		if (!map_file)
+			pg_fatal("could not open map file: %s", strerror(errno));
+	}
+
 	for (i = 0; i < PQntuples(res); i++)
 	{
 		char	   *dbname = PQgetvalue(res, i, 0);
+		char	   *oid = PQgetvalue(res, i, 1);
 		const char *create_opts;
 		int			ret;
 
@@ -1522,6 +1585,14 @@ dumpDatabases(PGconn *conn)
 		if (strcmp(dbname, "template0") == 0)
 			continue;
 
+		if (directory_format)
+		{
+			snprintf(dbfilepath, MAXPGPATH, "-f %s/%s", db_subdir, oid);
+
+			/* append dboid and dbname in map file. */
+			fprintf(map_file, "%s %s\n", oid, dbname);
+		}
+
 		/* Skip any explicitly excluded database */
 		if (simple_string_list_member(&database_exclude_names, dbname))
 		{
@@ -1531,7 +1602,8 @@ dumpDatabases(PGconn *conn)
 
 		pg_log_info("dumping database \"%s\"", dbname);
 
-		fprintf(OPF, "--\n-- Database \"%s\" dump\n--\n\n", dbname);
+		if (!directory_format)
+			fprintf(OPF, "--\n-- Database \"%s\" dump\n--\n\n", dbname);
 
 		/*
 		 * We assume that "template1" and "postgres" already exist in the
@@ -1549,20 +1621,21 @@ dumpDatabases(PGconn *conn)
 			{
 				create_opts = "";
 				/* Since pg_dump won't emit a \connect command, we must */
-				fprintf(OPF, "\\connect %s\n\n", dbname);
+				if (!directory_format)
+					fprintf(OPF, "\\connect %s\n\n", dbname);
 			}
 		}
 		else
 			create_opts = "--create";
 
-		if (filename)
+		if (!directory_format && filename)
 			fclose(OPF);
 
-		ret = runPgDump(dbname, create_opts);
+		ret = runPgDump(dbname, create_opts, dbfilepath);
 		if (ret != 0)
 			pg_fatal("pg_dump failed on database \"%s\", exiting", dbname);
 
-		if (filename)
+		if (!directory_format && filename)
 		{
 			OPF = fopen(filename, PG_BINARY_A);
 			if (!OPF)
@@ -1571,6 +1644,10 @@ dumpDatabases(PGconn *conn)
 		}
 	}
 
+	/* close map file */
+	if (directory_format)
+		fclose(map_file);
+
 	PQclear(res);
 }
 
@@ -1580,7 +1657,7 @@ dumpDatabases(PGconn *conn)
  * Run pg_dump on dbname, with specified options.
  */
 static int
-runPgDump(const char *dbname, const char *create_opts)
+runPgDump(const char *dbname, const char *create_opts, char *dbfile)
 {
 	PQExpBufferData connstrbuf;
 	PQExpBufferData cmd;
@@ -1589,17 +1666,26 @@ runPgDump(const char *dbname, const char *create_opts)
 	initPQExpBuffer(&connstrbuf);
 	initPQExpBuffer(&cmd);
 
-	printfPQExpBuffer(&cmd, "\"%s\" %s %s", pg_dump_bin,
-					  pgdumpopts->data, create_opts);
-
-	/*
-	 * If we have a filename, use the undocumented plain-append pg_dump
-	 * format.
-	 */
-	if (filename)
-		appendPQExpBufferStr(&cmd, " -Fa ");
+	if (dbfile)
+	{
+		printfPQExpBuffer(&cmd, "\"%s\" %s %s", pg_dump_bin,
+						  dbfile, create_opts);
+		appendPQExpBufferStr(&cmd, " -F d ");
+	}
 	else
-		appendPQExpBufferStr(&cmd, " -Fp ");
+	{
+		printfPQExpBuffer(&cmd, "\"%s\" %s %s", pg_dump_bin,
+						pgdumpopts->data, create_opts);
+
+		/*
+		* If we have a filename, use the undocumented plain-append pg_dump
+		* format.
+		*/
+		if (filename)
+			appendPQExpBufferStr(&cmd, " -Fa ");
+		else
+			appendPQExpBufferStr(&cmd, " -Fp ");
+	}
 
 	/*
 	 * Append the database name to the already-constructed stem of connection
diff --git a/src/bin/pg_dump/pg_restore.c b/src/bin/pg_dump/pg_restore.c
index 88ae39d..55d1862 100644
--- a/src/bin/pg_dump/pg_restore.c
+++ b/src/bin/pg_dump/pg_restore.c
@@ -41,27 +41,65 @@
 #include "postgres_fe.h"
 
 #include <ctype.h>
+#include <sys/stat.h>
 #ifdef HAVE_TERMIOS_H
 #include <termios.h>
 #endif
 
+#include "common/connect.h"
+#include "compress_io.h"
+#include "common/string.h"
 #include "fe_utils/option_utils.h"
+#include "fe_utils/string_utils.h"
 #include "filter.h"
 #include "getopt_long.h"
 #include "parallel.h"
+#include "pg_backup_archiver.h"
 #include "pg_backup_utils.h"
 
+typedef struct SimpleDBoidListCell
+{
+	struct SimpleDBoidListCell *next;
+	Oid         dboid;
+	const char  *dbname;
+} SimpleDBoidListCell;
+
+typedef struct SimpleActionList
+{
+	SimpleDBoidListCell *head;
+	SimpleDBoidListCell *tail;
+} SimpleDBoidList;
+
+static void
+simple_dboid_list_append(SimpleDBoidList *list, Oid dboid, const char *dbname);
+
 static void usage(const char *progname);
 static void read_restore_filters(const char *filename, RestoreOptions *opts);
+static bool _fileExistsInDirectory(const char *dir, const char *filename);
+static bool restoreOneDatabase(const char *inputFileSpec,
+		RestoreOptions *opts, int numWorkers);
+static PGconn *connectDatabase(const char *dbname, const char *conn_string,
+		const char *pghost, const char *pgport, const char *pguser,
+		trivalue prompt_password, bool fail_on_error);
+static PGresult *executeQuery(PGconn *conn, const char *query);
+static int ReadOneStatement(StringInfo inBuf, FILE *f_glo);
+static int restoreAllDatabases(const char *dumpdirpath,
+		SimpleStringList database_exclude_names, RestoreOptions *opts,
+		int numWorkers);
+static void execute_global_sql_commands(PGconn *conn, const char *dumpdirpath);
+static int filter_dbnames_for_restore(SimpleDBoidList *dbname_oid_list,
+		SimpleStringList database_exclude_names);
+static int get_dbname_oid_list_from_mfile(const char *dumpdirpath,
+		SimpleDBoidList *dbname_oid_list);
+static void simple_dboid_list_append(SimpleDBoidList *list, Oid dboid,
+		const char *dbname);
 
 int
 main(int argc, char **argv)
 {
 	RestoreOptions *opts;
 	int			c;
-	int			exit_code;
 	int			numWorkers = 1;
-	Archive    *AH;
 	char	   *inputFileSpec;
 	static int	disable_triggers = 0;
 	static int	enable_row_security = 0;
@@ -77,11 +115,14 @@ main(int argc, char **argv)
 	static int	strict_names = 0;
 	bool		data_only = false;
 	bool		schema_only = false;
+	SimpleStringList    database_exclude_names = {NULL, NULL};
+	bool				globals_only = false;
 
 	struct option cmdopts[] = {
 		{"clean", 0, NULL, 'c'},
 		{"create", 0, NULL, 'C'},
 		{"data-only", 0, NULL, 'a'},
+		{"globals-only", 0, NULL, 'g'},
 		{"dbname", 1, NULL, 'd'},
 		{"exit-on-error", 0, NULL, 'e'},
 		{"exclude-schema", 1, NULL, 'N'},
@@ -128,6 +169,7 @@ main(int argc, char **argv)
 		{"no-security-labels", no_argument, &no_security_labels, 1},
 		{"no-subscriptions", no_argument, &no_subscriptions, 1},
 		{"filter", required_argument, NULL, 4},
+		{"exclude-database", required_argument, NULL, 6},
 
 		{NULL, 0, NULL, 0}
 	};
@@ -156,7 +198,7 @@ main(int argc, char **argv)
 		}
 	}
 
-	while ((c = getopt_long(argc, argv, "acCd:ef:F:h:I:j:lL:n:N:Op:P:RsS:t:T:U:vwWx1",
+	while ((c = getopt_long(argc, argv, "aAcCd:ef:F:gh:I:j:lL:n:N:Op:P:RsS:t:T:U:vwWx1",
 							cmdopts, NULL)) != -1)
 	{
 		switch (c)
@@ -183,11 +225,14 @@ main(int argc, char **argv)
 				if (strlen(optarg) != 0)
 					opts->formatName = pg_strdup(optarg);
 				break;
+			case 'g':
+				/* restore only global.dat file from directory */
+				globals_only = true;
+				break;
 			case 'h':
 				if (strlen(optarg) != 0)
 					opts->cparams.pghost = pg_strdup(optarg);
 				break;
-
 			case 'j':			/* number of restore jobs */
 				if (!option_parse_int(optarg, "-j/--jobs", 1,
 									  PG_MAX_JOBS,
@@ -302,6 +347,14 @@ main(int argc, char **argv)
 					exit(1);
 				opts->exit_on_error = true;
 				break;
+			case 6:
+				/* list of databases those needs to skip while restoring */
+				simple_string_list_append(&database_exclude_names, optarg);
+				/*
+				 * XXX: TODO as of now, considering only db names but we can
+				 * implement for patterns also.
+				 */
+				break;
 
 			default:
 				/* getopt_long already emitted a complaint */
@@ -329,6 +382,16 @@ main(int argc, char **argv)
 	if (!opts->cparams.dbname && !opts->filename && !opts->tocSummary)
 		pg_fatal("one of -d/--dbname and -f/--file must be specified");
 
+	if (database_exclude_names.head != NULL && globals_only)
+	{
+		pg_log_error("option --exclude-database cannot be used together with -g/--globals-only");
+		pg_log_error_hint("Try \"%s --help\" for more information.", progname);
+		exit_nicely(1);
+	}
+
+	if (globals_only && opts->cparams.dbname == NULL)
+		pg_fatal("option -g/--globals-only requires option -d/--dbname");
+
 	/* Should get at most one of -d and -f, else user is confused */
 	if (opts->cparams.dbname)
 	{
@@ -406,6 +469,68 @@ main(int argc, char **argv)
 		}
 	}
 
+	/*
+	 * If directory format, then first check that toc.dat file exist or not?
+	 *
+	 * if toc.dat exist, then no need to check for map.dat and global.dat
+	 *
+	 */
+	if (opts->format == archDirectory &&
+			inputFileSpec != NULL &&
+			!_fileExistsInDirectory(inputFileSpec, "toc.dat"))
+	{
+		/* if global.dat and map.dat are exist, then open them */
+		if (_fileExistsInDirectory(pg_strdup(inputFileSpec), "global.dat")
+				&& _fileExistsInDirectory(pg_strdup(inputFileSpec), "map.dat"))
+		{
+			/* Found the global.dat and map.dat file so process. */
+			PGconn				*conn = NULL;
+
+			if (opts->cparams.dbname == NULL)
+				pg_fatal(" -d/--dbanme should be given if using dump of dumpall and global.dat");
+
+			if (opts->createDB != 1)
+				pg_fatal("option -C/--create should be specified if using dump of dumpall with global.dat");
+
+			/* Connect to database so that we can execute global.dat */
+			conn = connectDatabase(opts->cparams.dbname, NULL,
+					opts->cparams.pghost, opts->cparams.pgport, opts->cparams.username,
+					TRI_DEFAULT, false);
+
+			if (!conn)
+				pg_fatal("could not connect to database \"%s\"", opts->cparams.dbname);
+
+			/* Open global.dat file and execute all the sql commands */
+			execute_global_sql_commands(conn, inputFileSpec);
+
+			/* Close the db connection as we are done with globals */
+			PQfinish(conn);
+
+			/* if globals-only, then return from here */
+			if (globals_only)
+				return 0;
+
+			/* Now restore all the databases from map.dat file */
+			return restoreAllDatabases(inputFileSpec, database_exclude_names,
+					opts, numWorkers);
+		}/* end if */
+	}/* end if */
+
+	return restoreOneDatabase(inputFileSpec, opts, numWorkers);
+}
+
+/*
+ * restoreOneDatabase
+ *
+ * This will restore one database using toc.dat file.
+ */
+static bool
+restoreOneDatabase(const char *inputFileSpec, RestoreOptions *opts,
+		int numWorkers)
+{
+	Archive		*AH;
+	bool		exit_code;
+
 	AH = OpenArchive(inputFileSpec, opts->format);
 
 	SetArchiveOptions(AH, NULL, opts);
@@ -471,6 +596,7 @@ usage(const char *progname)
 	printf(_("  -c, --clean                  clean (drop) database objects before recreating\n"));
 	printf(_("  -C, --create                 create the target database\n"));
 	printf(_("  -e, --exit-on-error          exit on error, default is to continue\n"));
+	printf(_("  -g, --globals-only           restore only global objects, no databases\n"));
 	printf(_("  -I, --index=NAME             restore named index\n"));
 	printf(_("  -j, --jobs=NUM               use this many parallel jobs to restore\n"));
 	printf(_("  -L, --use-list=FILENAME      use table of contents from this file for\n"
@@ -483,6 +609,7 @@ usage(const char *progname)
 	printf(_("  -S, --superuser=NAME         superuser user name to use for disabling triggers\n"));
 	printf(_("  -t, --table=NAME             restore named relation (table, view, etc.)\n"));
 	printf(_("  -T, --trigger=NAME           restore named trigger\n"));
+	printf(_("  --exclude-database=NAME      exclude databases whose name matches with name\n"));
 	printf(_("  -x, --no-privileges          skip restoration of access privileges (grant/revoke)\n"));
 	printf(_("  -1, --single-transaction     restore as a single transaction\n"));
 	printf(_("  --disable-triggers           disable triggers during data-only restore\n"));
@@ -621,3 +748,529 @@ read_restore_filters(const char *filename, RestoreOptions *opts)
 
 	filter_free(&fstate);
 }
+
+static bool
+_fileExistsInDirectory(const char *dir, const char *filename)
+{
+	struct stat		st;
+	char			 buf[MAXPGPATH];
+
+	if (snprintf(buf, MAXPGPATH, "%s/%s", dir, filename) >= MAXPGPATH)
+		pg_fatal("directory name too long: \"%s\"", dir);
+
+	return (stat(buf, &st) == 0 && S_ISREG(st.st_mode));
+}
+
+/*
+ * Make a database connection with the given parameters.  An
+ * interactive password prompt is automatically issued if required.
+ *
+ * If fail_on_error is false, we return NULL without printing any message
+ * on failure, but preserve any prompted password for the next try.
+ *
+ */
+static PGconn *
+connectDatabase(const char *dbname, const char *connection_string,
+				const char *pghost, const char *pgport, const char *pguser,
+				trivalue prompt_password, bool fail_on_error)
+{
+	PGconn	   *conn;
+	bool		new_pass;
+	const char *remoteversion_str;
+	int			my_version;
+	const char **keywords = NULL;
+	const char **values = NULL;
+	PQconninfoOption *conn_opts = NULL;
+	static char *password = NULL;
+	static int  server_version;
+
+	if (prompt_password == TRI_YES && !password)
+		password = simple_prompt("Password: ", false);
+
+	/*
+	 * Start the connection.  Loop until we have a password if requested by
+	 * backend.
+	 */
+	do
+	{
+		int			argcount = 6;
+		PQconninfoOption *conn_opt;
+		char	   *err_msg = NULL;
+		int			i = 0;
+
+		free(keywords);
+		free(values);
+		PQconninfoFree(conn_opts);
+
+		/*
+		 * Merge the connection info inputs given in form of connection string
+		 * and other options.  Explicitly discard any dbname value in the
+		 * connection string; otherwise, PQconnectdbParams() would interpret
+		 * that value as being itself a connection string.
+		 */
+		if (connection_string)
+		{
+			conn_opts = PQconninfoParse(connection_string, &err_msg);
+			if (conn_opts == NULL)
+				pg_fatal("%s", err_msg);
+
+			for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
+			{
+				if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
+					strcmp(conn_opt->keyword, "dbname") != 0)
+					argcount++;
+			}
+
+			keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
+			values = pg_malloc0((argcount + 1) * sizeof(*values));
+
+			for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
+			{
+				if (conn_opt->val != NULL && conn_opt->val[0] != '\0' &&
+					strcmp(conn_opt->keyword, "dbname") != 0)
+				{
+					keywords[i] = conn_opt->keyword;
+					values[i] = conn_opt->val;
+					i++;
+				}
+			}
+		}
+		else
+		{
+			keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
+			values = pg_malloc0((argcount + 1) * sizeof(*values));
+		}
+
+		if (pghost)
+		{
+			keywords[i] = "host";
+			values[i] = pghost;
+			i++;
+		}
+		if (pgport)
+		{
+			keywords[i] = "port";
+			values[i] = pgport;
+			i++;
+		}
+		if (pguser)
+		{
+			keywords[i] = "user";
+			values[i] = pguser;
+			i++;
+		}
+		if (password)
+		{
+			keywords[i] = "password";
+			values[i] = password;
+			i++;
+		}
+		if (dbname)
+		{
+			keywords[i] = "dbname";
+			values[i] = dbname;
+			i++;
+		}
+		keywords[i] = "fallback_application_name";
+		values[i] = progname;
+		i++;
+
+		new_pass = false;
+		conn = PQconnectdbParams(keywords, values, true);
+
+		if (!conn)
+			pg_fatal("could not connect to database \"%s\"", dbname);
+
+		if (PQstatus(conn) == CONNECTION_BAD &&
+			PQconnectionNeedsPassword(conn) &&
+			!password &&
+			prompt_password != TRI_NO)
+		{
+			PQfinish(conn);
+			password = simple_prompt("Password: ", false);
+			new_pass = true;
+		}
+	} while (new_pass);
+
+	/* check to see that the backend connection was successfully made */
+	if (PQstatus(conn) == CONNECTION_BAD)
+	{
+		if (fail_on_error)
+			pg_fatal("%s", PQerrorMessage(conn));
+		else
+		{
+			PQfinish(conn);
+
+			free(keywords);
+			free(values);
+			PQconninfoFree(conn_opts);
+
+			return NULL;
+		}
+	}
+
+	free(keywords);
+	free(values);
+	PQconninfoFree(conn_opts);
+
+	/* Check version */
+	remoteversion_str = PQparameterStatus(conn, "server_version");
+	if (!remoteversion_str)
+		pg_fatal("could not get server version");
+	server_version = PQserverVersion(conn);
+	if (server_version == 0)
+		pg_fatal("could not parse server version \"%s\"",
+				 remoteversion_str);
+
+	my_version = PG_VERSION_NUM;
+
+	/*
+	 * We allow the server to be back to 9.2, and up to any minor release of
+	 * our own major version.  (See also version check in pg_dump.c.)
+	 */
+	if (my_version != server_version
+		&& (server_version < 90200 ||
+			(server_version / 100) > (my_version / 100)))
+	{
+		pg_log_error("aborting because of server version mismatch");
+		pg_log_error_detail("server version: %s; %s version: %s",
+							remoteversion_str, progname, PG_VERSION);
+		exit_nicely(1);
+	}
+
+	PQclear(executeQuery(conn, ALWAYS_SECURE_SEARCH_PATH_SQL));
+
+	return conn;
+}
+
+/*
+ * Run a query, return the results, exit program on failure.
+ */
+static PGresult *
+executeQuery(PGconn *conn, const char *query)
+{
+	PGresult   *res;
+
+	pg_log_info("executing %s", query);
+
+	res = PQexec(conn, query);
+	if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
+	{
+		pg_log_error("query failed: %s", PQerrorMessage(conn));
+		pg_log_error_detail("Query was: %s", query);
+		PQfinish(conn);
+		exit_nicely(1);
+	}
+
+	return res;
+}
+
+/* ----------------
+ *	ReadOneStatement()
+ *
+ * This will start reading from passed file pointer using fgetc and read till
+ * semicolon(sql statement terminator for global.sql file)
+ *
+ *	EOF is returned if end-of-file input is seen; time to shut down.
+ * ----------------
+ */
+
+static int
+ReadOneStatement(StringInfo inBuf, FILE *f_glo)
+{
+	int			c;				/* character read from getc() */
+
+	resetStringInfo(inBuf);
+
+	/*
+	 * Read characters until EOF or the appropriate delimiter is seen.
+	 */
+	while ((c = fgetc(f_glo)) != EOF)
+	{
+		appendStringInfoChar(inBuf, (char) c);
+
+		if (c == '\n')
+		{
+			if(inBuf->len > 1 &&
+					inBuf->data[inBuf->len - 2] == ';')
+				break;
+			else
+				continue;
+		}
+	}
+
+	/* No input before EOF signal means time to quit. */
+	if (c == EOF && inBuf->len == 0)
+		return EOF;
+
+	/* Add '\0' to make it look the same as message case. */
+	appendStringInfoChar(inBuf, (char) '\0');
+
+	return 'Q';
+}
+
+/*
+ * This will remove names from all dblist that are given with exclude-database
+ * option.
+ *
+ * returns number of dbnames those will be restored.
+ */
+static int
+filter_dbnames_for_restore(SimpleDBoidList *dbname_oid_list,
+		SimpleStringList database_exclude_names)
+{
+	int					countdb = 0;
+	SimpleDBoidListCell	*cell = dbname_oid_list->head;
+	SimpleDBoidListCell	*precell = NULL;
+
+	/* Return 0 if there is no db to restore. */
+	if (cell == NULL)
+		return 0;
+
+	while (cell != NULL)
+	{
+		bool	skip_db = false;
+
+		/* Now match this dbname with exclude-database list. */
+		for (SimpleStringListCell *celldb = database_exclude_names.head; celldb; celldb = celldb->next)
+		{
+			if (strcmp(celldb->val, cell->dbname) == 0)
+			{
+				/*
+				 * As we need to skip this dbname so set flag to remove it from
+				 * list.
+				 */
+				skip_db = true;
+				break;
+			}
+		}
+
+		/* Increment count if db needs to be restored. */
+		if (!skip_db)
+		{
+			countdb++;
+			precell = cell;
+			cell = cell->next;
+		}
+		else
+		{
+			if (precell != NULL)
+			{
+				precell->next = cell->next;
+				pfree(cell);
+				cell = precell->next;
+			}
+			else
+			{
+				dbname_oid_list->head = cell->next;
+				pfree(cell);
+				cell = dbname_oid_list->head;
+			}
+		}
+	}
+
+	return countdb;
+}
+
+/*
+ * Open map.dat file and read line by line and then prepare a list of database
+ * names and correspoding dboid.
+ *
+ * Returns, total number of database names in map.dat file.
+ */
+static int
+get_dbname_oid_list_from_mfile(const char *dumpdirpath, SimpleDBoidList *dbname_oid_list)
+{
+	FILE    *pfile;
+	char    map_file_path[MAXPGPATH];
+	char    line[MAXPGPATH];
+	int     count = 0;
+
+	snprintf(map_file_path, MAXPGPATH, "%s/map.dat", dumpdirpath);
+
+	/* Open map.dat file. */
+	pfile = fopen(map_file_path, "r");
+
+	if (pfile == NULL)
+		pg_fatal("could not open map.dat file: %s", strerror(errno));
+
+	/* Append all the dbname and dboid to the list. */
+	while((fgets(line, MAXPGPATH, pfile)) != NULL)
+	{
+		Oid         dboid;
+		char        dbname[MAXPGPATH + 1];
+
+		/* Extract dbname and dboid from line */
+		sscanf(line, "%u %s" , &dboid, dbname);
+		pg_log_info("found dbname as :%s and dboid:%d in map.dat file while restoring", dbname, dboid);
+
+		/* Report error if file has any corrupted data. */
+		if (!OidIsValid(dboid) || strlen(dbname) == 0)
+			pg_fatal("invalid entry in map.dat file at line : %d", count + 1);
+
+		/*
+		 * TODO : before adding dbanme into list, we can verify that this db
+		 * needs to skipped for restore or not.
+		 */
+		simple_dboid_list_append(dbname_oid_list, dboid, dbname);
+		count++;
+	}
+
+	/* Close map.dat file. */
+	fclose(pfile);
+
+	return count;
+}
+
+/*
+ * This will restore databases those dumps are present in
+ * directory based on map.dat file mapping.
+ *
+ * This will skip restoring for databases that are specified with
+ * exclude-database option.
+ */
+static int
+restoreAllDatabases(const char *dumpdirpath,
+		SimpleStringList database_exclude_names, RestoreOptions *opts,
+		int numWorkers)
+{
+	SimpleDBoidList			dbname_oid_list = {NULL, NULL};
+	SimpleDBoidListCell		*cell;
+	int						exit_code = 0;
+	int						num_db_restore;
+	int						num_total_db;
+
+	num_total_db = get_dbname_oid_list_from_mfile(dumpdirpath, &dbname_oid_list);
+
+	/* If map.dat has no entry, return from here. */
+	if (dbname_oid_list.head == NULL)
+		return 0;
+
+	pg_log_info("found total %d database names in map.dat file", num_total_db);
+
+	/* Skip any explicitly excluded database. */
+	num_db_restore = filter_dbnames_for_restore(&dbname_oid_list, database_exclude_names);
+
+	/* Exit if no db needs to be restored. */
+	if (dbname_oid_list.head == NULL)
+		return 0;
+
+	pg_log_info("needs to restore %d databases out of %d databases", num_db_restore, num_total_db);
+
+	/*
+	 * XXX: TODO till now, we made a list of databases, those needs to be restored
+	 * after skipping names of exclude-database.  Now we can launch parallel
+	 * workers to restore these databases.
+	 */
+	cell = dbname_oid_list.head;
+
+	while(cell != NULL)
+	{
+		char		subdirpath[MAXPGPATH];
+		int			dbexit_code;
+
+		/*
+		 * We need to reset override_dbname so that objects can be restored into
+		 * already created database. (used with -d/--dbname option)
+		 */
+		if (opts->cparams.override_dbname)
+		{
+			pfree(opts->cparams.override_dbname);
+			opts->cparams.override_dbname = NULL;
+		}
+
+		snprintf(subdirpath, MAXPGPATH, "%s/databases/%u", dumpdirpath, cell->dboid);
+
+		/*
+		 * Database -d/--dbname is already created so reset createDB to ignore
+		 * database creation error.
+		 */
+		if (strcmp(cell->dbname, opts->cparams.dbname) == 0)
+			opts->createDB = 0;
+
+		pg_log_info("restoring database \"%s\"", cell->dbname);
+
+		dbexit_code = restoreOneDatabase(subdirpath, opts, numWorkers);
+
+		/* Store exit_code to report it back. */
+		if (exit_code == 0 && dbexit_code != 0)
+			exit_code = dbexit_code;
+
+		/* Set createDB option to create new database. */
+		if (strcmp(cell->dbname, opts->cparams.dbname) == 0)
+			opts->createDB = 1;
+
+		cell = cell->next;
+	} /* end while */
+
+	/* Log number of processed databases.*/
+	pg_log_info("number of restored databases are %d", num_db_restore);
+
+	return exit_code;
+}
+
+/*
+ * This will open global.dat file and will execute all global sql commands one
+ * by one statement.
+ *
+ * semicolon is considered as statement terminator.
+ */
+static void
+execute_global_sql_commands(PGconn *conn, const char *dumpdirpath)
+{
+	char            global_file_path[MAXPGPATH];
+	PGresult		*result;
+	StringInfoData	sqlstatement;
+	FILE			*pfile;
+
+	snprintf(global_file_path, MAXPGPATH, "%s/global.dat", dumpdirpath);
+
+	/* now open global.dat file */
+	pfile = fopen(global_file_path, "r");
+
+	if (pfile == NULL)
+		pg_fatal("could not open global.dat file: %s", strerror(errno));
+
+	/* Init sqlstatement to append commands */
+	initStringInfo(&sqlstatement);
+
+	/* Process file till EOF and execute sql statements */
+	while (ReadOneStatement(&sqlstatement, pfile) != EOF)
+	{
+		result = PQexec(conn, sqlstatement.data);
+
+		switch (PQresultStatus(result))
+		{
+			case PGRES_COMMAND_OK:
+			case PGRES_TUPLES_OK:
+			case PGRES_EMPTY_QUERY:
+			case PGRES_COPY_IN:
+				break;
+			default:
+				pg_log_error("could not execute query: %s \nCommand was: %s", PQerrorMessage(conn), sqlstatement.data);
+		}
+	}
+
+	fclose(pfile);
+}
+
+/*
+ * appends a node to the list in the end.
+ */
+static void
+simple_dboid_list_append(SimpleDBoidList *list, Oid dboid, const char *dbname)
+{
+	SimpleDBoidListCell *cell;
+
+	cell = pg_malloc_object(SimpleDBoidListCell);
+
+	cell->next = NULL;
+	cell->dboid = dboid;
+	cell->dbname = pg_strdup(dbname);
+
+	if (list->tail)
+		list->tail->next = cell;
+	else
+		list->head = cell;
+	list->tail = cell;
+}
-- 
1.8.3.1

