(2009/12/21 9:39), KaiGai Kohei wrote:
> (2009/12/19 12:05), Robert Haas wrote:
>> On Fri, Dec 18, 2009 at 9:48 PM, Tom Lane<t...@sss.pgh.pa.us>   wrote:
>>> Robert Haas<robertmh...@gmail.com>   writes:
>>>> Oh.  This is more complicated than it appeared on the surface.  It
>>>> seems that the string "BLOB COMMENTS" actually gets inserted into
>>>> custom dumps somewhere, so I'm not sure whether we can just change it.
>>>>    Was this issue discussed at some point before this was committed?
>>>> Changing it would seem to require inserting some backward
>>>> compatibility code here.  Another option would be to add a separate
>>>> section for "BLOB METADATA", and leave "BLOB COMMENTS" alone.  Can
>>>> anyone comment on what the Right Thing To Do is here?
>>>
>>> The BLOB COMMENTS label is, or was, correct for what it contained.
>>> If this patch has usurped it to contain other things
>>
>> It has.
>>
>>> I would argue
>>> that that is seriously wrong.  pg_dump already has a clear notion
>>> of how to handle ACLs for objects.  ACLs for blobs ought to be
>>> made to fit into that structure, not dumped in some random place
>>> because that saved a few lines of code.
>>
>> OK.  Hopefully KaiGai or Takahiro can suggest a fix.

The patch has grown larger than I expected before, because the way
to handle large objects are far from any other object classes.

Here are three points:

1) The new BLOB ACLS section was added.

It is a single purpose section to describe GRANT/REVOKE statements
on large objects, and BLOB COMMENTS section was reverted to put
only descriptions.

Because we need to assume a case when the database holds massive
number of large objects, it is not reasonable to store them using
dumpACL(). It chains an ACL entry with the list of TOC entries,
then, these are dumped. It means pg_dump may have to register massive
number of large objects in the local memory space.

Currently, we also store GRANT/REVOKE statements in BLOB COMMENTS
section, but confusable. Even if pg_restore is launched with
--no-privileges options, it cannot ignore GRANT/REVOKE statements
on large objects. This fix enables to distinguish ACLs on large
objects from other properties, and to handle them correctly.

2) The BLOBS section was separated for each database users.

Currently, the BLOBS section does not have information about owner
of the large objects to be restored. So, we tried to alter its
ownership in the BLOB COMMENTS section, but incorrect.

The --use-set-session-authorization option requires to restore
ownership of objects without ALTER ... OWNER TO statements.
So, we need to set up correct database username on the section
properties.

This patch renamed the hasBlobs() by getBlobs(), and changed its
purpose. It registers DO_BLOBS, DO_BLOB_COMMENTS and DO_BLOB_ACLS
for each large objects owners, if necessary.
For example, if here are five users owning large objects, getBlobs()
shall register five TOC entries for each users, and dumpBlobs(),
dumpBlobComments() and dumpBlobAcls() shall be also invoked five
times with the username.

3) _LoadBlobs()

For regular database object classes, _printTocEntry() can inject
"ALTER xxx OWNER TO ..." statement on the restored object based on
the ownership described in the section header.
However, we cannot use this infrastructure for large objects as-is,
because one BLOBS section can restore multiple large objects.

_LoadBlobs() is a routine to restore large objects within a certain
section. This patch modifies this routine to inject "ALTER LARGE
OBJECT <loid> OWNER TO <user>" statement for each large objects
based on the ownership of the section.
(if --use-set-session-authorization is not given.)


$ diffstat pgsql-fix-pg_dump-blob-privs.patch
 pg_backup_archiver.c |    4
 pg_backup_custom.c   |   11 !
 pg_backup_files.c    |    9 !
 pg_backup_tar.c      |    9 !
 pg_dump.c            |  312 +++++++----!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
 pg_dump.h            |    9 !
 pg_dump_sort.c       |    8 !
 7 files changed, 68 insertions(+), 25 deletions(-), 269 modifications(!)

Thanks,
-- 
OSS Platform Development Division, NEC
KaiGai Kohei <kai...@ak.jp.nec.com>
*** base/src/bin/pg_dump/pg_dump.h	(revision 2512)
--- base/src/bin/pg_dump/pg_dump.h	(working copy)
***************
*** 116,122 ****
  	DO_FOREIGN_SERVER,
  	DO_DEFAULT_ACL,
  	DO_BLOBS,
! 	DO_BLOB_COMMENTS
  } DumpableObjectType;
  
  typedef struct _dumpableObject
--- 116,123 ----
  	DO_FOREIGN_SERVER,
  	DO_DEFAULT_ACL,
  	DO_BLOBS,
! 	DO_BLOB_COMMENTS,
! 	DO_BLOB_ACLS,
  } DumpableObjectType;
  
  typedef struct _dumpableObject
***************
*** 442,447 ****
--- 443,454 ----
  	char	   *defaclacl;
  } DefaultACLInfo;
  
+ typedef struct _blobsInfo
+ {
+ 	DumpableObject dobj;
+ 	char	   *rolname;
+ } BlobsInfo;
+ 
  /* global decls */
  extern bool force_quotes;		/* double-quotes for identifiers flag */
  extern bool g_verbose;			/* verbose flag */
*** base/src/bin/pg_dump/pg_backup_tar.c	(revision 2512)
--- base/src/bin/pg_dump/pg_backup_tar.c	(working copy)
***************
*** 104,110 ****
  
  static const char *modulename = gettext_noop("tar archiver");
  
! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
  
  static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode);
  static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH);
--- 104,110 ----
  
  static const char *modulename = gettext_noop("tar archiver");
  
! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  
  static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode);
  static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH);
***************
*** 700,712 ****
  	}
  
  	if (strcmp(te->desc, "BLOBS") == 0)
! 		_LoadBlobs(AH, ropt);
  	else
  		_PrintFileData(AH, tctx->filename, ropt);
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
--- 700,712 ----
  	}
  
  	if (strcmp(te->desc, "BLOBS") == 0)
! 		_LoadBlobs(AH, te, ropt);
  	else
  		_PrintFileData(AH, tctx->filename, ropt);
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
***************
*** 737,742 ****
--- 737,745 ----
  					ahwrite(buf, 1, cnt, AH);
  				}
  				EndRestoreBlob(AH, oid);
+ 				if (!ropt->use_setsessauth)
+ 					ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n",
+ 							 oid, te->owner);
  				foundBlob = true;
  			}
  			tarClose(AH, th);
*** base/src/bin/pg_dump/pg_dump_sort.c	(revision 2512)
--- base/src/bin/pg_dump/pg_dump_sort.c	(working copy)
***************
*** 93,99 ****
  	15,							/* DO_FOREIGN_SERVER */
  	27,							/* DO_DEFAULT_ACL */
  	20,							/* DO_BLOBS */
! 	21							/* DO_BLOB_COMMENTS */
  };
  
  
--- 93,100 ----
  	15,							/* DO_FOREIGN_SERVER */
  	27,							/* DO_DEFAULT_ACL */
  	20,							/* DO_BLOBS */
! 	21,							/* DO_BLOB_COMMENTS */
! 	28,							/* DO_BLOB_ACLS */
  };
  
  
***************
*** 1156,1161 ****
--- 1157,1167 ----
  					 "BLOB COMMENTS  (ID %d)",
  					 obj->dumpId);
  			return;
+ 		case DO_BLOB_ACLS:
+ 			snprintf(buf, bufsize,
+ 					 "BLOB ACLS  (ID %d)",
+ 					 obj->dumpId);
+ 			return;
  	}
  	/* shouldn't get here */
  	snprintf(buf, bufsize,
*** base/src/bin/pg_dump/pg_backup_files.c	(revision 2512)
--- base/src/bin/pg_dump/pg_backup_files.c	(working copy)
***************
*** 66,72 ****
  } lclTocEntry;
  
  static const char *modulename = gettext_noop("file archiver");
! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
  static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname);
  
  /*
--- 66,72 ----
  } lclTocEntry;
  
  static const char *modulename = gettext_noop("file archiver");
! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname);
  
  /*
***************
*** 330,336 ****
  		return;
  
  	if (strcmp(te->desc, "BLOBS") == 0)
! 		_LoadBlobs(AH, ropt);
  	else
  		_PrintFileData(AH, tctx->filename, ropt);
  }
--- 330,336 ----
  		return;
  
  	if (strcmp(te->desc, "BLOBS") == 0)
! 		_LoadBlobs(AH, te, ropt);
  	else
  		_PrintFileData(AH, tctx->filename, ropt);
  }
***************
*** 365,371 ****
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
--- 365,371 ----
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
***************
*** 385,390 ****
--- 385,393 ----
  		StartRestoreBlob(AH, oid, ropt->dropSchema);
  		_PrintFileData(AH, fname, ropt);
  		EndRestoreBlob(AH, oid);
+ 		if (!ropt->use_setsessauth)
+ 			ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n",
+ 					 oid, te->owner);
  		_getBlobTocEntry(AH, &oid, fname);
  	}
  
*** base/src/bin/pg_dump/pg_backup_archiver.c	(revision 2512)
--- base/src/bin/pg_dump/pg_backup_archiver.c	(working copy)
***************
*** 519,525 ****
  				_printTocEntry(AH, te, ropt, true, false);
  
  				if (strcmp(te->desc, "BLOBS") == 0 ||
! 					strcmp(te->desc, "BLOB COMMENTS") == 0)
  				{
  					ahlog(AH, 1, "restoring %s\n", te->desc);
  
--- 519,527 ----
  				_printTocEntry(AH, te, ropt, true, false);
  
  				if (strcmp(te->desc, "BLOBS") == 0 ||
! 					strcmp(te->desc, "BLOB COMMENTS") == 0 ||
! 					(!ropt->aclsSkip &&
! 					 strcmp(te->desc, "BLOB ACLS") == 0))
  				{
  					ahlog(AH, 1, "restoring %s\n", te->desc);
  
*** base/src/bin/pg_dump/pg_backup_custom.c	(revision 2512)
--- base/src/bin/pg_dump/pg_backup_custom.c	(working copy)
***************
*** 54,60 ****
  static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
! static void _LoadBlobs(ArchiveHandle *AH, bool drop);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
--- 54,60 ----
  static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
***************
*** 498,504 ****
  			break;
  
  		case BLK_BLOBS:
! 			_LoadBlobs(AH, ropt->dropSchema);
  			break;
  
  		default:				/* Always have a default */
--- 498,504 ----
  			break;
  
  		case BLK_BLOBS:
! 			_LoadBlobs(AH, te, ropt);
  			break;
  
  		default:				/* Always have a default */
***************
*** 619,625 ****
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, bool drop)
  {
  	Oid			oid;
  
--- 619,625 ----
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  {
  	Oid			oid;
  
***************
*** 628,636 ****
  	oid = ReadInt(AH);
  	while (oid != 0)
  	{
! 		StartRestoreBlob(AH, oid, drop);
  		_PrintData(AH);
  		EndRestoreBlob(AH, oid);
  		oid = ReadInt(AH);
  	}
  
--- 628,639 ----
  	oid = ReadInt(AH);
  	while (oid != 0)
  	{
! 		StartRestoreBlob(AH, oid, ropt->dropSchema);
  		_PrintData(AH);
  		EndRestoreBlob(AH, oid);
+ 		if (!ropt->use_setsessauth)
+ 			ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n",
+ 					 oid, te->owner);
  		oid = ReadInt(AH);
  	}
  
*** base/src/bin/pg_dump/pg_dump.c	(revision 2513)
--- base/src/bin/pg_dump/pg_dump.c	(working copy)
***************
*** 190,198 ****
  static char *getFormattedTypeName(Oid oid, OidOptions opts);
  static char *myFormatType(const char *typname, int32 typmod);
  static const char *fmtQualifiedId(const char *schema, const char *id);
! static bool hasBlobs(Archive *AH);
  static int	dumpBlobs(Archive *AH, void *arg);
  static int	dumpBlobComments(Archive *AH, void *arg);
  static void dumpDatabase(Archive *AH);
  static void dumpEncoding(Archive *AH);
  static void dumpStdStrings(Archive *AH);
--- 190,199 ----
  static char *getFormattedTypeName(Oid oid, OidOptions opts);
  static char *myFormatType(const char *typname, int32 typmod);
  static const char *fmtQualifiedId(const char *schema, const char *id);
! static void getBlobs(Archive *AH);
  static int	dumpBlobs(Archive *AH, void *arg);
  static int	dumpBlobComments(Archive *AH, void *arg);
+ static int	dumpBlobAcls(Archive *AH, void *arg);
  static void dumpDatabase(Archive *AH);
  static void dumpEncoding(Archive *AH);
  static void dumpStdStrings(Archive *AH);
***************
*** 695,720 ****
  			getTableDataFKConstraints();
  	}
  
! 	if (outputBlobs && hasBlobs(g_fout))
! 	{
! 		/* Add placeholders to allow correct sorting of blobs */
! 		DumpableObject *blobobj;
! 		DumpableObject *blobcobj;
  
- 		blobobj = (DumpableObject *) malloc(sizeof(DumpableObject));
- 		blobobj->objType = DO_BLOBS;
- 		blobobj->catId = nilCatalogId;
- 		AssignDumpId(blobobj);
- 		blobobj->name = strdup("BLOBS");
- 
- 		blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject));
- 		blobcobj->objType = DO_BLOB_COMMENTS;
- 		blobcobj->catId = nilCatalogId;
- 		AssignDumpId(blobcobj);
- 		blobcobj->name = strdup("BLOB COMMENTS");
- 		addObjectDependency(blobcobj, blobobj->dumpId);
- 	}
- 
  	/*
  	 * Collect dependency data to assist in ordering the objects.
  	 */
--- 696,704 ----
  			getTableDataFKConstraints();
  	}
  
! 	if (outputBlobs)
! 		getBlobs(g_fout);
  
  	/*
  	 * Collect dependency data to assist in ordering the objects.
  	 */
***************
*** 1932,1966 ****
  
  
  /*
!  * hasBlobs:
   *	Test whether database contains any large objects
   */
! static bool
! hasBlobs(Archive *AH)
  {
! 	bool		result;
  	const char *blobQry;
  	PGresult   *res;
  
  	/* Make sure we are in proper schema */
  	selectSourceSchema("pg_catalog");
  
  	/* Check for BLOB OIDs */
  	if (AH->remoteVersion >= 80500)
! 		blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1";
  	else if (AH->remoteVersion >= 70100)
! 		blobQry = "SELECT loid FROM pg_largeobject LIMIT 1";
  	else
! 		blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1";
  
  	res = PQexec(g_conn, blobQry);
  	check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK);
  
! 	result = PQntuples(res) > 0;
  
  	PQclear(res);
- 
- 	return result;
  }
  
  /*
--- 1916,1980 ----
  
  
  /*
!  * getBlobs:
   *	Test whether database contains any large objects
+  *  If exist, it adds BlobsInfo objects for each owners
   */
! static void
! getBlobs(Archive *AH)
  {
! 	BlobsInfo  *blobobj;
! 	BlobsInfo  *blobcobj;
! 	BlobsInfo  *blobaobj;
  	const char *blobQry;
  	PGresult   *res;
+ 	int			i;
  
  	/* Make sure we are in proper schema */
  	selectSourceSchema("pg_catalog");
  
  	/* Check for BLOB OIDs */
  	if (AH->remoteVersion >= 80500)
! 		blobQry = "SELECT DISTINCT pg_get_userbyid(lomowner)"
! 			" FROM pg_largeobject_metadata";
  	else if (AH->remoteVersion >= 70100)
! 		blobQry = "SELECT NULL FROM pg_largeobject LIMIT 1";
  	else
! 		blobQry = "SELECT NULL FROM pg_class WHERE relkind = 'l' LIMIT 1";
  
  	res = PQexec(g_conn, blobQry);
  	check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK);
  
! 	for (i = 0; i < PQntuples(res); i++)
! 	{
! 		blobobj = (BlobsInfo *) malloc(sizeof(BlobsInfo));
! 		blobobj->dobj.objType = DO_BLOBS;
! 		blobobj->dobj.catId = nilCatalogId;
! 		AssignDumpId(&blobobj->dobj);
! 		blobobj->dobj.name = strdup("BLOBS");
! 		blobobj->rolname = strdup(PQgetvalue(res, i, 0));
  
+ 		blobcobj = (BlobsInfo *) malloc(sizeof(BlobsInfo));
+ 		blobcobj->dobj.objType = DO_BLOB_COMMENTS;
+ 		blobcobj->dobj.catId = nilCatalogId;
+ 		AssignDumpId(&blobcobj->dobj);
+ 		blobcobj->dobj.name = strdup("BLOB COMMENTS");
+ 		blobcobj->rolname = strdup(PQgetvalue(res, i, 0));
+ 		addObjectDependency(&blobcobj->dobj, blobobj->dobj.dumpId);
+ 
+ 		if (AH->remoteVersion < 80500 || dataOnly || aclsSkip)
+ 			continue;
+ 
+ 		blobaobj = (BlobsInfo *) malloc(sizeof(BlobsInfo));
+ 		blobaobj->dobj.objType = DO_BLOB_ACLS;
+ 		blobaobj->dobj.catId = nilCatalogId;
+ 		AssignDumpId(&blobaobj->dobj);
+ 		blobaobj->dobj.name = strdup("BLOB ACLS");
+ 		blobaobj->rolname = strdup(PQgetvalue(res, i, 0));
+ 		addObjectDependency(&blobaobj->dobj, blobobj->dobj.dumpId);
+ 	}
+ 
  	PQclear(res);
  }
  
  /*
***************
*** 1970,1977 ****
  static int
  dumpBlobs(Archive *AH, void *arg)
  {
! 	const char *blobQry;
! 	const char *blobFetchQry;
  	PGresult   *res;
  	char		buf[LOBBUFSIZE];
  	int			i;
--- 1984,1993 ----
  static int
  dumpBlobs(Archive *AH, void *arg)
  {
! 	BlobsInfo  *binfo = (BlobsInfo *)arg;
! 	PQExpBuffer blobQry = createPQExpBuffer();
! 	const char *blobFetchQry = "FETCH 1000 IN bloboid";
! 	const char *blobCloseQry = "CLOSE bloboid";
  	PGresult   *res;
  	char		buf[LOBBUFSIZE];
  	int			i;
***************
*** 1985,2002 ****
  
  	/* Cursor to get all BLOB OIDs */
  	if (AH->remoteVersion >= 80500)
! 		blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata";
  	else if (AH->remoteVersion >= 70100)
! 		blobQry = "DECLARE bloboid CURSOR FOR SELECT DISTINCT loid FROM pg_largeobject";
  	else
! 		blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_class WHERE relkind = 'l'";
  
! 	res = PQexec(g_conn, blobQry);
! 	check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
  
- 	/* Command to fetch from cursor */
- 	blobFetchQry = "FETCH 1000 IN bloboid";
- 
  	do
  	{
  		PQclear(res);
--- 2001,2020 ----
  
  	/* Cursor to get all BLOB OIDs */
  	if (AH->remoteVersion >= 80500)
! 		appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR "
! 						  "SELECT oid, lomacl FROM pg_largeobject_metadata "
! 						  "WHERE pg_get_userbyid(lomowner) = '%s'",
! 						  binfo->rolname);
  	else if (AH->remoteVersion >= 70100)
! 		appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR "
! 						  "SELECT DISTINCT loid, NULL FROM pg_largeobject");
  	else
! 		appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR "
! 						  "SELECT oid, NULL FROM pg_class WHERE relkind = 'l'");
  
! 	res = PQexec(g_conn, blobQry->data);
! 	check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK);
  
  	do
  	{
  		PQclear(res);
***************
*** 2045,2058 ****
  
  	PQclear(res);
  
  	return 1;
  }
  
  /*
   * dumpBlobComments
!  *	dump all blob properties.
!  *  It has "BLOB COMMENTS" tag due to the historical reason, but note
!  *  that it is the routine to dump all the properties of blobs.
   *
   * Since we don't provide any way to be selective about dumping blobs,
   * there's no need to be selective about their comments either.  We put
--- 2063,2078 ----
  
  	PQclear(res);
  
+ 	/* Cleanup cursor */
+ 	res = PQexec(g_conn, blobCloseQry);
+ 	check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK);
+ 
  	return 1;
  }
  
  /*
   * dumpBlobComments
!  *	dump all blob comments.
   *
   * Since we don't provide any way to be selective about dumping blobs,
   * there's no need to be selective about their comments either.  We put
***************
*** 2061,2069 ****
  static int
  dumpBlobComments(Archive *AH, void *arg)
  {
! 	const char *blobQry;
! 	const char *blobFetchQry;
  	PQExpBuffer cmdQry = createPQExpBuffer();
  	PGresult   *res;
  	int			i;
  
--- 2081,2091 ----
  static int
  dumpBlobComments(Archive *AH, void *arg)
  {
! 	char	   *rolname = ((BlobsInfo *)arg)->rolname;
  	PQExpBuffer cmdQry = createPQExpBuffer();
+ 	PQExpBuffer blobQry = createPQExpBuffer();
+ 	const char *blobFetchQry = "FETCH 100 IN blobcmt";
+ 	const char *blobCloseQry = "CLOSE blobcmt";
  	PGresult   *res;
  	int			i;
  
***************
*** 2075,2113 ****
  
  	/* Cursor to get all BLOB comments */
  	if (AH->remoteVersion >= 80500)
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
! 			"obj_description(oid, 'pg_largeobject'), "
! 			"pg_get_userbyid(lomowner), lomacl "
! 			"FROM pg_largeobject_metadata";
  	else if (AH->remoteVersion >= 70300)
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 			"obj_description(loid, 'pg_largeobject'), NULL, NULL "
! 			"FROM (SELECT DISTINCT loid FROM "
! 			"pg_description d JOIN pg_largeobject l ON (objoid = loid) "
! 			"WHERE classoid = 'pg_largeobject'::regclass) ss";
  	else if (AH->remoteVersion >= 70200)
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 			"obj_description(loid, 'pg_largeobject'), NULL, NULL "
! 			"FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
  	else if (AH->remoteVersion >= 70100)
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 			"obj_description(loid), NULL, NULL "
! 			"FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
  	else
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
! 			"	( "
! 			"		SELECT description "
! 			"		FROM pg_description pd "
! 			"		WHERE pd.objoid=pc.oid "
! 			"	), NULL, NULL "
! 			"FROM pg_class pc WHERE relkind = 'l'";
  
! 	res = PQexec(g_conn, blobQry);
! 	check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
  
- 	/* Command to fetch from cursor */
- 	blobFetchQry = "FETCH 100 IN blobcmt";
- 
  	do
  	{
  		PQclear(res);
--- 2097,2137 ----
  
  	/* Cursor to get all BLOB comments */
  	if (AH->remoteVersion >= 80500)
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT oid, "
! 						  "obj_description(oid, 'pg_largeobject') "
! 						  "FROM pg_largeobject_metadata "
! 						  "WHERE pg_get_userbyid(lomowner) = '%s'", rolname);
  	else if (AH->remoteVersion >= 70300)
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 						  "obj_description(loid, 'pg_largeobject') "
! 						  "FROM (SELECT DISTINCT loid FROM "
! 						  "pg_description d JOIN pg_largeobject l ON (objoid = loid) "
! 						  "WHERE classoid = 'pg_largeobject'::regclass) ss");
  	else if (AH->remoteVersion >= 70200)
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 						  "obj_description(loid, 'pg_largeobject') "
! 						  "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss");
  	else if (AH->remoteVersion >= 70100)
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 						  "obj_description(loid) "
! 						  "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss");
  	else
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT oid, "
! 						  "	( "
! 						  "		SELECT description "
! 						  "		FROM pg_description pd "
! 						  "		WHERE pd.objoid=pc.oid "
! 						  "	) "
! 						  "FROM pg_class pc WHERE relkind = 'l'");
  
! 	res = PQexec(g_conn, blobQry->data);
! 	check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK);
  
  	do
  	{
  		PQclear(res);
***************
*** 2121,2169 ****
  		{
  			Oid			blobOid = atooid(PQgetvalue(res, i, 0));
  			char	   *lo_comment = PQgetvalue(res, i, 1);
- 			char	   *lo_owner = PQgetvalue(res, i, 2);
- 			char	   *lo_acl = PQgetvalue(res, i, 3);
- 			char		lo_name[32];
  
  			resetPQExpBuffer(cmdQry);
  
! 			/* comment on the blob */
! 			if (!PQgetisnull(res, i, 1))
! 			{
! 				appendPQExpBuffer(cmdQry,
! 								  "COMMENT ON LARGE OBJECT %u IS ", blobOid);
! 				appendStringLiteralAH(cmdQry, lo_comment, AH);
! 				appendPQExpBuffer(cmdQry, ";\n");
! 			}
  
! 			/* dump blob ownership, if necessary */
! 			if (!PQgetisnull(res, i, 2))
! 			{
! 				appendPQExpBuffer(cmdQry,
! 								  "ALTER LARGE OBJECT %u OWNER TO %s;\n",
! 								  blobOid, lo_owner);
! 			}
  
! 			/* dump blob privileges, if necessary */
! 			if (!PQgetisnull(res, i, 3) &&
! 				!dataOnly && !aclsSkip)
! 			{
! 				snprintf(lo_name, sizeof(lo_name), "%u", blobOid);
! 				if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT",
! 									  lo_acl, lo_owner, "",
! 									  AH->remoteVersion, cmdQry))
! 				{
! 					write_msg(NULL, "could not parse ACL (%s) for "
! 							  "large object %u", lo_acl, blobOid);
! 					exit_nicely();
! 				}
! 			}
  
! 			if (cmdQry->len > 0)
  			{
! 				appendPQExpBuffer(cmdQry, "\n");
! 				archputs(cmdQry->data, AH);
  			}
  		}
  	} while (PQntuples(res) > 0);
  
--- 2145,2244 ----
  		{
  			Oid			blobOid = atooid(PQgetvalue(res, i, 0));
  			char	   *lo_comment = PQgetvalue(res, i, 1);
  
+ 			/* comment on the blob, if necessary */
+ 			if (PQgetisnull(res, i, 1))
+ 				continue;
+ 
  			resetPQExpBuffer(cmdQry);
  
! 			appendPQExpBuffer(cmdQry,
! 							  "COMMENT ON LARGE OBJECT %u IS ", blobOid);
! 			appendStringLiteralAH(cmdQry, lo_comment, AH);
! 			appendPQExpBuffer(cmdQry, ";\n\n");
  
! 			archputs(cmdQry->data, AH);
! 		}
! 	} while (PQntuples(res) > 0);
  
! 	PQclear(res);
  
! 	archputs("\n", AH);
! 
! 	destroyPQExpBuffer(cmdQry);
! 
! 	/* Cleanup cursor */
! 	res = PQexec(g_conn, blobCloseQry);
! 	check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK);
! 
! 	return 1;
! }
! 
! /*
!  * dumpBlobAcls
!  *	dump all blob privileges.
!  *
!  * Since we don't provide any way to be selective about dumping blobs,
!  * there's no need to be selective about their privileges either.
!  * We put all the privileges into one big TOC entry.
!  */
! static int
! dumpBlobAcls(Archive *AH, void *arg)
! {
! 	char	   *rolname = ((BlobsInfo *)arg)->rolname;
! 	PQExpBuffer	cmdQry = createPQExpBuffer();
! 	PQExpBuffer blobQry = createPQExpBuffer();
! 	const char *blobFetchQry = "FETCH 100 IN blobacl";
! 	const char *blobCloseQry = "CLOSE blobacl";
! 	PGresult   *res;
! 	int			i;
! 
! 	if (g_verbose)
! 		write_msg(NULL, "saving large object permissions\n");
! 
! 	/* Cursor to get all BLOB permissions */
! 	if (AH->remoteVersion >= 80500)
! 		appendPQExpBuffer(blobQry, "DECLARE blobacl CURSOR FOR "
! 						  "SELECT oid, lomacl FROM pg_largeobject_metadata "
! 						  "WHERE pg_get_userbyid(lomowner) = '%s'", rolname);
! 	else
! 		return 1;	/* no need to dump anything < 8.5 */
! 
! 	/* Make sure we are in proper schema */
!     selectSourceSchema("pg_catalog");
! 
! 	/* Open cursor */
! 	res = PQexec(g_conn, blobQry->data);
!     check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK);
! 
! 	do
! 	{
! 		PQclear(res);
! 
! 		/* Do a fetch */
! 		res = PQexec(g_conn, blobFetchQry);
! 		check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
! 
! 		/* Process the tuples, if any */
! 		for (i = 0; i < PQntuples(res); i++)
! 		{
! 			char   *lo_oid = PQgetvalue(res, i, 0);
! 			char   *lo_acl = PQgetvalue(res, i, 1);
! 
! 			if (PQgetisnull(res, i, 1))
! 				continue;
! 
! 			resetPQExpBuffer(cmdQry);
! 
! 			if (!buildACLCommands(lo_oid, NULL, "LARGE OBJECT",
! 								  lo_acl, rolname, "",
! 								  AH->remoteVersion, cmdQry))
  			{
! 				write_msg(NULL, "could not parse ACL (%s) for "
! 						  "large object %s", lo_acl, lo_oid);
! 				exit_nicely();
  			}
+ 			archprintf(AH, "%s\n", cmdQry->data);
  		}
  	} while (PQntuples(res) > 0);
  
***************
*** 2173,2178 ****
--- 2248,2257 ----
  
  	destroyPQExpBuffer(cmdQry);
  
+ 	/* Cleanup */
+ 	res = PQexec(g_conn, blobCloseQry);
+ 	check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK);
+ 
  	return 1;
  }
  
***************
*** 6292,6311 ****
  			break;
  		case DO_BLOBS:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL, "",
  						 false, "BLOBS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
! 						 dumpBlobs, NULL);
  			break;
  		case DO_BLOB_COMMENTS:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL, "",
  						 false, "BLOB COMMENTS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
! 						 dumpBlobComments, NULL);
  			break;
  	}
  }
  
--- 6371,6401 ----
  			break;
  		case DO_BLOBS:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL,
! 						 ((BlobsInfo *) dobj)->rolname,
  						 false, "BLOBS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
! 						 dumpBlobs, dobj);
  			break;
  		case DO_BLOB_COMMENTS:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL,
! 						 ((BlobsInfo *) dobj)->rolname,
  						 false, "BLOB COMMENTS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
! 						 dumpBlobComments, dobj);
  			break;
+ 		case DO_BLOB_ACLS:
+ 			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
+ 						 dobj->name, NULL, NULL,
+ 						 ((BlobsInfo *) dobj)->rolname,
+ 						 false, "BLOB ACLS", SECTION_DATA,
+ 						 "", "", NULL,
+ 						 dobj->dependencies, dobj->nDeps,
+ 						 dumpBlobAcls, dobj);
+ 			break;
  	}
  }
  
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to