The attached patch is a revised version.

List of updates:
- cleanup: getBlobs() was renamed to getBlobOwners()
- cleanup: BlobsInfo was renamed to BlobOwnerInfo
- bugfix: pg_get_userbyid() in SQLs were replaced by username_subquery which
          constins a right subquery to obtain a username for the given id.
          It enables to run pg_dump under the concurrent role deletion.
- bugfix: Even if we give -a (--data-only) or -O (--no-owner) options,
          or archive does not contain Owner information, it tried to write
          out "ALTER LARGE OBJECT xxx OWNER TO ..." statement.
- bugfix: Even if we give -a (--data-only) or -x (--no-privileges) options,
          it tried to write out "BLOB ACLS" section.

The last two are the problems I noticed newly. It needs to introduce them.

The BLOB section can contain multiple definitions of large objects, unlike
any other object classes. It is also a reason why I had to group large
objects by database user.
The Owner tag of BLOB section is used to identify owner of the large objects
to be restored, and also used in --use-set-session-authorization mode.
However, we need to inject "ALTER LARGE OBJECT xxx OWNER TO ..." statement
for each lo_create() in _LoadBlobs(), because we cannot know how many large
objects are in the section before reading the archive.
But the last patch didn't pay mention about -a/-O option and an archive
which does not have Owner: tag.

The BLOB ACLS section is categorized to SECTION_DATA, it follows the manner
in BLOB COMMENTS behavior. In same reason, it has to handle the -a/-x option
by itself, but the last patch didn't handle it well.

BTW, here is a known issue. When we run pg_dump with -s(--schema-only),
it write out descriptions of regular object classes, but does not write
out the description of large objects.
It seems to me the description of large objects are considered as a part
of data, not properties. However, it might be inconsistent.

The reason of this behavior is all the BLOB dumps are categorized to
SECTION_DATA, so -s option informs pg_backup_archiver.c to skip routines
related to large objects.

However, it may be a time to consider this code into two steps.
In the schema section stage,
 - It creates empty large objects with lo_create()
 - It restores the description of the large objects.
 - It restores the ownership/privileges of the large objects.

In the date section stage,
 - It loads actual data contents into the empty large objects with lowrite().

Thanks,

(2010/01/21 19:42), Takahiro Itagaki wrote:
> 
> KaiGai Kohei<kai...@ak.jp.nec.com>  wrote:
> 
>>> I'm not sure whether we need to make groups for each owner of large objects.
>>> If I remember right, the primary issue was separating routines for dump
>>> BLOB ACLS from routines for BLOB COMMENTS, right? Why did you make the 
>>> change?
>>
>> When --use-set-session-authorization is specified, pg_restore tries to
>> change database role of the current session just before creation of
>> database objects to be restored.
>>
>> Ownership of the database objects are recorded in the section header,
>> and it informs pg_restore who should be owner of the objects to be
>> restored in this section.
>>
>> Then, pg_restore can generate ALTER xxx OWNER TO after creation, or
>> SET SESSION AUTHORIZATION before creation in runtime.
>> So, we cannot put creation of largeobjects with different ownership
>> in same section.
>>
>> It is the reason why we have to group largeobjects by database user.
> 
> Ah, I see.
> 
> Then... What happen if we drop or rename roles who have large objects
> during pg_dump? Does the patch still work? It uses pg_get_userbyid().
> 
> Regards,
> ---
> Takahiro Itagaki
> NTT Open Source Software Center
> 
> 
> 


-- 
OSS Platform Development Division, NEC
KaiGai Kohei <kai...@ak.jp.nec.com>
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
***************
*** 517,527 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
  			 */
  			if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
  			{
- 				_printTocEntry(AH, te, ropt, true, false);
- 
  				if (strcmp(te->desc, "BLOBS") == 0 ||
! 					strcmp(te->desc, "BLOB COMMENTS") == 0)
  				{
  					ahlog(AH, 1, "restoring %s\n", te->desc);
  
  					_selectOutputSchema(AH, "pg_catalog");
--- 517,535 ----
  			 */
  			if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
  			{
  				if (strcmp(te->desc, "BLOBS") == 0 ||
! 					strcmp(te->desc, "BLOB COMMENTS") == 0 ||
! 					strcmp(te->desc, "BLOB ACLS") == 0)
  				{
+ 					/*
+ 					 * We don't need to dump ACLs, if -a or -x cases.
+ 					 */
+ 					if (strcmp(te->desc, "BLOB ACLS") == 0 &&
+ 						(ropt->aclsSkip || ropt->dataOnly))
+ 						return retval;
+ 
+ 					_printTocEntry(AH, te, ropt, true, false);
+ 
  					ahlog(AH, 1, "restoring %s\n", te->desc);
  
  					_selectOutputSchema(AH, "pg_catalog");
***************
*** 530,535 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
--- 538,545 ----
  				}
  				else
  				{
+ 					_printTocEntry(AH, te, ropt, true, false);
+ 
  					_disableTriggersIfNecessary(AH, te, ropt);
  
  					/* Select owner and schema as necessary */
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
***************
*** 54,60 **** static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
  static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
! static void _LoadBlobs(ArchiveHandle *AH, bool drop);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
--- 54,60 ----
  static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
  static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _Clone(ArchiveHandle *AH);
  static void _DeClone(ArchiveHandle *AH);
  
***************
*** 498,504 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  			break;
  
  		case BLK_BLOBS:
! 			_LoadBlobs(AH, ropt->dropSchema);
  			break;
  
  		default:				/* Always have a default */
--- 498,504 ----
  			break;
  
  		case BLK_BLOBS:
! 			_LoadBlobs(AH, te, ropt);
  			break;
  
  		default:				/* Always have a default */
***************
*** 619,625 **** _PrintData(ArchiveHandle *AH)
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, bool drop)
  {
  	Oid			oid;
  
--- 619,625 ----
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  {
  	Oid			oid;
  
***************
*** 628,636 **** _LoadBlobs(ArchiveHandle *AH, bool drop)
  	oid = ReadInt(AH);
  	while (oid != 0)
  	{
! 		StartRestoreBlob(AH, oid, drop);
  		_PrintData(AH);
  		EndRestoreBlob(AH, oid);
  		oid = ReadInt(AH);
  	}
  
--- 628,640 ----
  	oid = ReadInt(AH);
  	while (oid != 0)
  	{
! 		StartRestoreBlob(AH, oid, ropt->dropSchema);
  		_PrintData(AH);
  		EndRestoreBlob(AH, oid);
+ 		if (!ropt->noOwner && !ropt->dataOnly &&
+ 			!ropt->use_setsessauth && strlen(te->owner) > 0)
+ 			ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n",
+ 					 oid, te->owner);
  		oid = ReadInt(AH);
  	}
  
*** a/src/bin/pg_dump/pg_backup_files.c
--- b/src/bin/pg_dump/pg_backup_files.c
***************
*** 66,72 **** typedef struct
  } lclTocEntry;
  
  static const char *modulename = gettext_noop("file archiver");
! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
  static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname);
  
  /*
--- 66,72 ----
  } lclTocEntry;
  
  static const char *modulename = gettext_noop("file archiver");
! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname);
  
  /*
***************
*** 330,336 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  		return;
  
  	if (strcmp(te->desc, "BLOBS") == 0)
! 		_LoadBlobs(AH, ropt);
  	else
  		_PrintFileData(AH, tctx->filename, ropt);
  }
--- 330,336 ----
  		return;
  
  	if (strcmp(te->desc, "BLOBS") == 0)
! 		_LoadBlobs(AH, te, ropt);
  	else
  		_PrintFileData(AH, tctx->filename, ropt);
  }
***************
*** 365,371 **** _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char fname[K_STD_BUF_SIZE])
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
--- 365,371 ----
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
***************
*** 385,390 **** _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
--- 385,394 ----
  		StartRestoreBlob(AH, oid, ropt->dropSchema);
  		_PrintFileData(AH, fname, ropt);
  		EndRestoreBlob(AH, oid);
+ 		if (!ropt->noOwner && !ropt->dataOnly &&
+ 			!ropt->use_setsessauth && strlen(te->owner) > 0)
+ 			ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n",
+ 					 oid, te->owner);
  		_getBlobTocEntry(AH, &oid, fname);
  	}
  
*** a/src/bin/pg_dump/pg_backup_tar.c
--- b/src/bin/pg_dump/pg_backup_tar.c
***************
*** 100,106 **** typedef struct
  
  static const char *modulename = gettext_noop("tar archiver");
  
! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
  
  static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode);
  static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH);
--- 100,106 ----
  
  static const char *modulename = gettext_noop("tar archiver");
  
! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
  
  static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode);
  static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH);
***************
*** 696,708 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  	}
  
  	if (strcmp(te->desc, "BLOBS") == 0)
! 		_LoadBlobs(AH, ropt);
  	else
  		_PrintFileData(AH, tctx->filename, ropt);
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
--- 696,708 ----
  	}
  
  	if (strcmp(te->desc, "BLOBS") == 0)
! 		_LoadBlobs(AH, te, ropt);
  	else
  		_PrintFileData(AH, tctx->filename, ropt);
  }
  
  static void
! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
  {
  	Oid			oid;
  	lclContext *ctx = (lclContext *) AH->formatData;
***************
*** 733,738 **** _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
--- 733,742 ----
  					ahwrite(buf, 1, cnt, AH);
  				}
  				EndRestoreBlob(AH, oid);
+ 				if (!ropt->noOwner && !ropt->dataOnly &&
+ 					!ropt->use_setsessauth && strlen(te->owner) > 0)
+ 					ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n",
+ 							 oid, te->owner);
  				foundBlob = true;
  			}
  			tarClose(AH, th);
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
***************
*** 190,198 **** static void selectSourceSchema(const char *schemaName);
  static char *getFormattedTypeName(Oid oid, OidOptions opts);
  static char *myFormatType(const char *typname, int32 typmod);
  static const char *fmtQualifiedId(const char *schema, const char *id);
! static bool hasBlobs(Archive *AH);
  static int	dumpBlobs(Archive *AH, void *arg);
  static int	dumpBlobComments(Archive *AH, void *arg);
  static void dumpDatabase(Archive *AH);
  static void dumpEncoding(Archive *AH);
  static void dumpStdStrings(Archive *AH);
--- 190,199 ----
  static char *getFormattedTypeName(Oid oid, OidOptions opts);
  static char *myFormatType(const char *typname, int32 typmod);
  static const char *fmtQualifiedId(const char *schema, const char *id);
! static void getBlobOwners(Archive *AH);
  static int	dumpBlobs(Archive *AH, void *arg);
  static int	dumpBlobComments(Archive *AH, void *arg);
+ static int	dumpBlobAcls(Archive *AH, void *arg);
  static void dumpDatabase(Archive *AH);
  static void dumpEncoding(Archive *AH);
  static void dumpStdStrings(Archive *AH);
***************
*** 701,725 **** main(int argc, char **argv)
  			getTableDataFKConstraints();
  	}
  
! 	if (outputBlobs && hasBlobs(g_fout))
! 	{
! 		/* Add placeholders to allow correct sorting of blobs */
! 		DumpableObject *blobobj;
! 		DumpableObject *blobcobj;
! 
! 		blobobj = (DumpableObject *) malloc(sizeof(DumpableObject));
! 		blobobj->objType = DO_BLOBS;
! 		blobobj->catId = nilCatalogId;
! 		AssignDumpId(blobobj);
! 		blobobj->name = strdup("BLOBS");
! 
! 		blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject));
! 		blobcobj->objType = DO_BLOB_COMMENTS;
! 		blobcobj->catId = nilCatalogId;
! 		AssignDumpId(blobcobj);
! 		blobcobj->name = strdup("BLOB COMMENTS");
! 		addObjectDependency(blobcobj, blobobj->dumpId);
! 	}
  
  	/*
  	 * Collect dependency data to assist in ordering the objects.
--- 702,709 ----
  			getTableDataFKConstraints();
  	}
  
! 	if (outputBlobs)
! 		getBlobOwners(g_fout);
  
  	/*
  	 * Collect dependency data to assist in ordering the objects.
***************
*** 1938,1972 **** dumpStdStrings(Archive *AH)
  
  
  /*
!  * hasBlobs:
   *	Test whether database contains any large objects
   */
! static bool
! hasBlobs(Archive *AH)
  {
! 	bool		result;
! 	const char *blobQry;
! 	PGresult   *res;
  
  	/* Make sure we are in proper schema */
  	selectSourceSchema("pg_catalog");
  
  	/* Check for BLOB OIDs */
  	if (AH->remoteVersion >= 80500)
! 		blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1";
  	else if (AH->remoteVersion >= 70100)
! 		blobQry = "SELECT loid FROM pg_largeobject LIMIT 1";
  	else
! 		blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1";
  
! 	res = PQexec(g_conn, blobQry);
! 	check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK);
  
! 	result = PQntuples(res) > 0;
  
  	PQclear(res);
- 
- 	return result;
  }
  
  /*
--- 1922,1990 ----
  
  
  /*
!  * getBlobOwners:
   *	Test whether database contains any large objects
+  *  If exist, it adds BlobOwnerInfo objects for each owners
   */
! static void
! getBlobOwners(Archive *AH)
  {
! 	PQExpBuffer		blobQry = createPQExpBuffer();
! 	BlobOwnerInfo  *blobobj;
! 	BlobOwnerInfo  *blobcobj;
! 	BlobOwnerInfo  *blobaobj;
! 	PGresult	   *res;
! 	int				i;
  
  	/* Make sure we are in proper schema */
  	selectSourceSchema("pg_catalog");
  
  	/* Check for BLOB OIDs */
  	if (AH->remoteVersion >= 80500)
! 		appendPQExpBuffer(blobQry,
! 						  "SELECT DISTINCT (%s lomowner)"
! 						  " FROM pg_largeobject_metadata",
! 						  username_subquery);
  	else if (AH->remoteVersion >= 70100)
! 		appendPQExpBuffer(blobQry,
! 						  "SELECT NULL FROM pg_largeobject LIMIT 1");
  	else
! 		appendPQExpBuffer(blobQry,
! 						  "SELECT NULL FROM pg_class WHERE relkind = 'l' LIMIT 1");
  
! 	res = PQexec(g_conn, blobQry->data);
! 	check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK);
  
! 	for (i = 0; i < PQntuples(res); i++)
! 	{
! 		blobobj = (BlobOwnerInfo *) malloc(sizeof(BlobOwnerInfo));
! 		blobobj->dobj.objType = DO_BLOBS;
! 		blobobj->dobj.catId = nilCatalogId;
! 		AssignDumpId(&blobobj->dobj);
! 		blobobj->dobj.name = strdup("BLOBS");
! 		blobobj->rolname = strdup(PQgetvalue(res, i, 0));
! 
! 		blobcobj = (BlobOwnerInfo *) malloc(sizeof(BlobOwnerInfo));
! 		blobcobj->dobj.objType = DO_BLOB_COMMENTS;
! 		blobcobj->dobj.catId = nilCatalogId;
! 		AssignDumpId(&blobcobj->dobj);
! 		blobcobj->dobj.name = strdup("BLOB COMMENTS");
! 		blobcobj->rolname = strdup(PQgetvalue(res, i, 0));
! 		addObjectDependency(&blobcobj->dobj, blobobj->dobj.dumpId);
! 
! 		if (AH->remoteVersion >= 80500 && !dataOnly && !aclsSkip)
! 		{
! 			blobaobj = (BlobOwnerInfo *) malloc(sizeof(BlobOwnerInfo));
! 			blobaobj->dobj.objType = DO_BLOB_ACLS;
! 			blobaobj->dobj.catId = nilCatalogId;
! 			AssignDumpId(&blobaobj->dobj);
! 			blobaobj->dobj.name = strdup("BLOB ACLS");
! 			blobaobj->rolname = strdup(PQgetvalue(res, i, 0));
! 			addObjectDependency(&blobaobj->dobj, blobobj->dobj.dumpId);
! 		}
! 	}
  
  	PQclear(res);
  }
  
  /*
***************
*** 1976,1983 **** hasBlobs(Archive *AH)
  static int
  dumpBlobs(Archive *AH, void *arg)
  {
! 	const char *blobQry;
! 	const char *blobFetchQry;
  	PGresult   *res;
  	char		buf[LOBBUFSIZE];
  	int			i;
--- 1994,2003 ----
  static int
  dumpBlobs(Archive *AH, void *arg)
  {
! 	char	   *rolname = ((BlobOwnerInfo *)arg)->rolname;
! 	PQExpBuffer	blobQry = createPQExpBuffer();
! 	const char *blobFetchQry = "FETCH 1000 IN bloboid";
! 	const char *blobCloseQry = "CLOSE bloboid";
  	PGresult   *res;
  	char		buf[LOBBUFSIZE];
  	int			i;
***************
*** 1991,2007 **** dumpBlobs(Archive *AH, void *arg)
  
  	/* Cursor to get all BLOB OIDs */
  	if (AH->remoteVersion >= 80500)
! 		blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata";
  	else if (AH->remoteVersion >= 70100)
! 		blobQry = "DECLARE bloboid CURSOR FOR SELECT DISTINCT loid FROM pg_largeobject";
  	else
! 		blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_class WHERE relkind = 'l'";
  
! 	res = PQexec(g_conn, blobQry);
! 	check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
! 
! 	/* Command to fetch from cursor */
! 	blobFetchQry = "FETCH 1000 IN bloboid";
  
  	do
  	{
--- 2011,2029 ----
  
  	/* Cursor to get all BLOB OIDs */
  	if (AH->remoteVersion >= 80500)
! 		appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR "
! 						  "SELECT oid, lomacl FROM pg_largeobject_metadata "
! 						  "WHERE '%s' in (%s lomowner)",
! 						  rolname, username_subquery);
  	else if (AH->remoteVersion >= 70100)
! 		appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR "
! 						  "SELECT DISTINCT loid, NULL FROM pg_largeobject");
  	else
! 		appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR "
! 						  "SELECT oid, NULL FROM pg_class WHERE relkind = 'l'");
  
! 	res = PQexec(g_conn, blobQry->data);
! 	check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK);
  
  	do
  	{
***************
*** 2051,2064 **** dumpBlobs(Archive *AH, void *arg)
  
  	PQclear(res);
  
  	return 1;
  }
  
  /*
   * dumpBlobComments
!  *	dump all blob properties.
!  *  It has "BLOB COMMENTS" tag due to the historical reason, but note
!  *  that it is the routine to dump all the properties of blobs.
   *
   * Since we don't provide any way to be selective about dumping blobs,
   * there's no need to be selective about their comments either.  We put
--- 2073,2088 ----
  
  	PQclear(res);
  
+ 	/* Cleanup cursor */
+ 	res = PQexec(g_conn, blobCloseQry);
+ 	check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK);
+ 
  	return 1;
  }
  
  /*
   * dumpBlobComments
!  *	dump all blob comments.
   *
   * Since we don't provide any way to be selective about dumping blobs,
   * there's no need to be selective about their comments either.  We put
***************
*** 2067,2075 **** dumpBlobs(Archive *AH, void *arg)
  static int
  dumpBlobComments(Archive *AH, void *arg)
  {
! 	const char *blobQry;
! 	const char *blobFetchQry;
  	PQExpBuffer cmdQry = createPQExpBuffer();
  	PGresult   *res;
  	int			i;
  
--- 2091,2101 ----
  static int
  dumpBlobComments(Archive *AH, void *arg)
  {
! 	char	   *rolname = ((BlobOwnerInfo *)arg)->rolname;
  	PQExpBuffer cmdQry = createPQExpBuffer();
+ 	PQExpBuffer blobQry = createPQExpBuffer();
+ 	const char *blobFetchQry = "FETCH 100 IN blobcmt";
+ 	const char *blobCloseQry = "CLOSE blobcmt";
  	PGresult   *res;
  	int			i;
  
***************
*** 2081,2118 **** dumpBlobComments(Archive *AH, void *arg)
  
  	/* Cursor to get all BLOB comments */
  	if (AH->remoteVersion >= 80500)
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
! 			"obj_description(oid, 'pg_largeobject'), "
! 			"pg_get_userbyid(lomowner), lomacl "
! 			"FROM pg_largeobject_metadata";
  	else if (AH->remoteVersion >= 70300)
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 			"obj_description(loid, 'pg_largeobject'), NULL, NULL "
! 			"FROM (SELECT DISTINCT loid FROM "
! 			"pg_description d JOIN pg_largeobject l ON (objoid = loid) "
! 			"WHERE classoid = 'pg_largeobject'::regclass) ss";
  	else if (AH->remoteVersion >= 70200)
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 			"obj_description(loid, 'pg_largeobject'), NULL, NULL "
! 			"FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
  	else if (AH->remoteVersion >= 70100)
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 			"obj_description(loid), NULL, NULL "
! 			"FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
  	else
! 		blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
! 			"	( "
! 			"		SELECT description "
! 			"		FROM pg_description pd "
! 			"		WHERE pd.objoid=pc.oid "
! 			"	), NULL, NULL "
! 			"FROM pg_class pc WHERE relkind = 'l'";
! 
! 	res = PQexec(g_conn, blobQry);
! 	check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
! 
! 	/* Command to fetch from cursor */
! 	blobFetchQry = "FETCH 100 IN blobcmt";
  
  	do
  	{
--- 2107,2147 ----
  
  	/* Cursor to get all BLOB comments */
  	if (AH->remoteVersion >= 80500)
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT oid, "
! 						  "obj_description(oid, 'pg_largeobject') "
! 						  "FROM pg_largeobject_metadata "
! 						  "WHERE '%s' in (%s lomowner)",
! 						  rolname, username_subquery);
  	else if (AH->remoteVersion >= 70300)
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 						  "obj_description(loid, 'pg_largeobject') "
! 						  "FROM (SELECT DISTINCT loid FROM "
! 						  "pg_description d JOIN pg_largeobject l ON (objoid = loid) "
! 						  "WHERE classoid = 'pg_largeobject'::regclass) ss");
  	else if (AH->remoteVersion >= 70200)
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 						  "obj_description(loid, 'pg_largeobject') "
! 						  "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss");
  	else if (AH->remoteVersion >= 70100)
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT loid, "
! 						  "obj_description(loid) "
! 						  "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss");
  	else
! 		appendPQExpBuffer(blobQry,
! 						  "DECLARE blobcmt CURSOR FOR SELECT oid, "
! 						  "	( "
! 						  "		SELECT description "
! 						  "		FROM pg_description pd "
! 						  "		WHERE pd.objoid=pc.oid "
! 						  "	) "
! 						  "FROM pg_class pc WHERE relkind = 'l'");
! 
! 	res = PQexec(g_conn, blobQry->data);
! 	check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK);
  
  	do
  	{
***************
*** 2127,2175 **** dumpBlobComments(Archive *AH, void *arg)
  		{
  			Oid			blobOid = atooid(PQgetvalue(res, i, 0));
  			char	   *lo_comment = PQgetvalue(res, i, 1);
! 			char	   *lo_owner = PQgetvalue(res, i, 2);
! 			char	   *lo_acl = PQgetvalue(res, i, 3);
! 			char		lo_name[32];
  
  			resetPQExpBuffer(cmdQry);
  
! 			/* comment on the blob */
! 			if (!PQgetisnull(res, i, 1))
! 			{
! 				appendPQExpBuffer(cmdQry,
! 								  "COMMENT ON LARGE OBJECT %u IS ", blobOid);
! 				appendStringLiteralAH(cmdQry, lo_comment, AH);
! 				appendPQExpBuffer(cmdQry, ";\n");
! 			}
  
! 			/* dump blob ownership, if necessary */
! 			if (!PQgetisnull(res, i, 2))
! 			{
! 				appendPQExpBuffer(cmdQry,
! 								  "ALTER LARGE OBJECT %u OWNER TO %s;\n",
! 								  blobOid, lo_owner);
! 			}
  
! 			/* dump blob privileges, if necessary */
! 			if (!PQgetisnull(res, i, 3) &&
! 				!dataOnly && !aclsSkip)
! 			{
! 				snprintf(lo_name, sizeof(lo_name), "%u", blobOid);
! 				if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT",
! 									  lo_acl, lo_owner, "",
! 									  AH->remoteVersion, cmdQry))
! 				{
! 					write_msg(NULL, "could not parse ACL (%s) for "
! 							  "large object %u", lo_acl, blobOid);
! 					exit_nicely();
! 				}
! 			}
  
! 			if (cmdQry->len > 0)
  			{
! 				appendPQExpBuffer(cmdQry, "\n");
! 				archputs(cmdQry->data, AH);
  			}
  		}
  	} while (PQntuples(res) > 0);
  
--- 2156,2256 ----
  		{
  			Oid			blobOid = atooid(PQgetvalue(res, i, 0));
  			char	   *lo_comment = PQgetvalue(res, i, 1);
! 
! 			/* comment on the blob, if necessary */
! 			if (PQgetisnull(res, i, 1))
! 				continue;
  
  			resetPQExpBuffer(cmdQry);
  
! 			appendPQExpBuffer(cmdQry,
! 							  "COMMENT ON LARGE OBJECT %u IS ", blobOid);
! 			appendStringLiteralAH(cmdQry, lo_comment, AH);
! 			appendPQExpBuffer(cmdQry, ";\n\n");
  
! 			archputs(cmdQry->data, AH);
! 		}
! 	} while (PQntuples(res) > 0);
  
! 	PQclear(res);
! 
! 	archputs("\n", AH);
! 
! 	destroyPQExpBuffer(cmdQry);
! 
! 	/* Cleanup cursor */
! 	res = PQexec(g_conn, blobCloseQry);
! 	check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK);
! 
! 	return 1;
! }
! 
! /*
!  * dumpBlobAcls
!  *	dump all blob privileges.
!  *
!  * Since we don't provide any way to be selective about dumping blobs,
!  * there's no need to be selective about their privileges either.
!  * We put all the privileges into one big TOC entry.
!  */
! static int
! dumpBlobAcls(Archive *AH, void *arg)
! {
! 	char	   *rolname = ((BlobOwnerInfo *)arg)->rolname;
! 	PQExpBuffer	cmdQry = createPQExpBuffer();
! 	PQExpBuffer blobQry = createPQExpBuffer();
! 	const char *blobFetchQry = "FETCH 100 IN blobacl";
! 	const char *blobCloseQry = "CLOSE blobacl";
! 	PGresult   *res;
! 	int			i;
! 
! 	if (g_verbose)
! 		write_msg(NULL, "saving large object permissions\n");
! 
! 	/* Cursor to get all BLOB permissions */
! 	if (AH->remoteVersion >= 80500)
! 		appendPQExpBuffer(blobQry, "DECLARE blobacl CURSOR FOR "
! 						  "SELECT oid, lomacl FROM pg_largeobject_metadata "
! 						  "WHERE '%s' in (%s lomowner)",
! 						  rolname, username_subquery);
! 	else
! 		return 1;	/* no need to dump anything < 8.5 */
! 
! 	/* Make sure we are in proper schema */
!     selectSourceSchema("pg_catalog");
! 
! 	/* Open cursor */
! 	res = PQexec(g_conn, blobQry->data);
!     check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK);
! 
! 	do
! 	{
! 		PQclear(res);
! 
! 		/* Do a fetch */
! 		res = PQexec(g_conn, blobFetchQry);
! 		check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
! 
! 		/* Process the tuples, if any */
! 		for (i = 0; i < PQntuples(res); i++)
! 		{
! 			char   *lo_oid = PQgetvalue(res, i, 0);
! 			char   *lo_acl = PQgetvalue(res, i, 1);
  
! 			if (PQgetisnull(res, i, 1))
! 				continue;
! 
! 			resetPQExpBuffer(cmdQry);
! 
! 			if (!buildACLCommands(lo_oid, NULL, "LARGE OBJECT",
! 								  lo_acl, rolname, "",
! 								  AH->remoteVersion, cmdQry))
  			{
! 				write_msg(NULL, "could not parse ACL (%s) for "
! 						  "large object %s", lo_acl, lo_oid);
! 				exit_nicely();
  			}
+ 			archprintf(AH, "%s\n", cmdQry->data);
  		}
  	} while (PQntuples(res) > 0);
  
***************
*** 2179,2184 **** dumpBlobComments(Archive *AH, void *arg)
--- 2260,2269 ----
  
  	destroyPQExpBuffer(cmdQry);
  
+ 	/* Cleanup */
+ 	res = PQexec(g_conn, blobCloseQry);
+ 	check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK);
+ 
  	return 1;
  }
  
***************
*** 6480,6498 **** dumpDumpableObject(Archive *fout, DumpableObject *dobj)
  			break;
  		case DO_BLOBS:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL, "",
  						 false, "BLOBS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
! 						 dumpBlobs, NULL);
  			break;
  		case DO_BLOB_COMMENTS:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL, "",
  						 false, "BLOB COMMENTS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
! 						 dumpBlobComments, NULL);
  			break;
  	}
  }
--- 6565,6594 ----
  			break;
  		case DO_BLOBS:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL,
! 						 ((BlobOwnerInfo *) dobj)->rolname,
  						 false, "BLOBS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
! 						 dumpBlobs, dobj);
  			break;
  		case DO_BLOB_COMMENTS:
  			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL,
! 						 ((BlobOwnerInfo *) dobj)->rolname,
  						 false, "BLOB COMMENTS", SECTION_DATA,
  						 "", "", NULL,
  						 dobj->dependencies, dobj->nDeps,
! 						 dumpBlobComments, dobj);
! 			break;
! 		case DO_BLOB_ACLS:
! 			ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! 						 dobj->name, NULL, NULL,
! 						 ((BlobOwnerInfo *) dobj)->rolname,
! 						 false, "BLOB ACLS", SECTION_DATA,
! 						 "", "", NULL,
! 						 dobj->dependencies, dobj->nDeps,
! 						 dumpBlobAcls, dobj);
  			break;
  	}
  }
*** a/src/bin/pg_dump/pg_dump.h
--- b/src/bin/pg_dump/pg_dump.h
***************
*** 116,122 **** typedef enum
  	DO_FOREIGN_SERVER,
  	DO_DEFAULT_ACL,
  	DO_BLOBS,
! 	DO_BLOB_COMMENTS
  } DumpableObjectType;
  
  typedef struct _dumpableObject
--- 116,123 ----
  	DO_FOREIGN_SERVER,
  	DO_DEFAULT_ACL,
  	DO_BLOBS,
! 	DO_BLOB_COMMENTS,
! 	DO_BLOB_ACLS,
  } DumpableObjectType;
  
  typedef struct _dumpableObject
***************
*** 442,447 **** typedef struct _defaultACLInfo
--- 443,454 ----
  	char	   *defaclacl;
  } DefaultACLInfo;
  
+ typedef struct _blobOwnerInfo
+ {
+ 	DumpableObject dobj;
+ 	char	   *rolname;
+ } BlobOwnerInfo;
+ 
  /* global decls */
  extern bool force_quotes;		/* double-quotes for identifiers flag */
  extern bool g_verbose;			/* verbose flag */
*** a/src/bin/pg_dump/pg_dump_sort.c
--- b/src/bin/pg_dump/pg_dump_sort.c
***************
*** 93,99 **** static const int newObjectTypePriority[] =
  	15,							/* DO_FOREIGN_SERVER */
  	27,							/* DO_DEFAULT_ACL */
  	20,							/* DO_BLOBS */
! 	21							/* DO_BLOB_COMMENTS */
  };
  
  
--- 93,100 ----
  	15,							/* DO_FOREIGN_SERVER */
  	27,							/* DO_DEFAULT_ACL */
  	20,							/* DO_BLOBS */
! 	21,							/* DO_BLOB_COMMENTS */
! 	28,							/* DO_BLOB_ACLS */
  };
  
  
***************
*** 1156,1161 **** describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
--- 1157,1167 ----
  					 "BLOB COMMENTS  (ID %d)",
  					 obj->dumpId);
  			return;
+ 		case DO_BLOB_ACLS:
+ 			snprintf(buf, bufsize,
+ 					 "BLOB ACLS  (ID %d)",
+ 					 obj->dumpId);
+ 			return;
  	}
  	/* shouldn't get here */
  	snprintf(buf, bufsize,
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to