The attached patch is a revised version.
List of updates:
- cleanup: getBlobs() was renamed to getBlobOwners()
- cleanup: BlobsInfo was renamed to BlobOwnerInfo
- bugfix: pg_get_userbyid() in SQLs were replaced by username_subquery which
constins a right subquery to obtain a username for the given id.
It enables to run pg_dump under the concurrent role deletion.
- bugfix: Even if we give -a (--data-only) or -O (--no-owner) options,
or archive does not contain Owner information, it tried to write
out "ALTER LARGE OBJECT xxx OWNER TO ..." statement.
- bugfix: Even if we give -a (--data-only) or -x (--no-privileges) options,
it tried to write out "BLOB ACLS" section.
The last two are the problems I noticed newly. It needs to introduce them.
The BLOB section can contain multiple definitions of large objects, unlike
any other object classes. It is also a reason why I had to group large
objects by database user.
The Owner tag of BLOB section is used to identify owner of the large objects
to be restored, and also used in --use-set-session-authorization mode.
However, we need to inject "ALTER LARGE OBJECT xxx OWNER TO ..." statement
for each lo_create() in _LoadBlobs(), because we cannot know how many large
objects are in the section before reading the archive.
But the last patch didn't pay mention about -a/-O option and an archive
which does not have Owner: tag.
The BLOB ACLS section is categorized to SECTION_DATA, it follows the manner
in BLOB COMMENTS behavior. In same reason, it has to handle the -a/-x option
by itself, but the last patch didn't handle it well.
BTW, here is a known issue. When we run pg_dump with -s(--schema-only),
it write out descriptions of regular object classes, but does not write
out the description of large objects.
It seems to me the description of large objects are considered as a part
of data, not properties. However, it might be inconsistent.
The reason of this behavior is all the BLOB dumps are categorized to
SECTION_DATA, so -s option informs pg_backup_archiver.c to skip routines
related to large objects.
However, it may be a time to consider this code into two steps.
In the schema section stage,
- It creates empty large objects with lo_create()
- It restores the description of the large objects.
- It restores the ownership/privileges of the large objects.
In the date section stage,
- It loads actual data contents into the empty large objects with lowrite().
Thanks,
(2010/01/21 19:42), Takahiro Itagaki wrote:
>
> KaiGai Kohei<[email protected]> wrote:
>
>>> I'm not sure whether we need to make groups for each owner of large objects.
>>> If I remember right, the primary issue was separating routines for dump
>>> BLOB ACLS from routines for BLOB COMMENTS, right? Why did you make the
>>> change?
>>
>> When --use-set-session-authorization is specified, pg_restore tries to
>> change database role of the current session just before creation of
>> database objects to be restored.
>>
>> Ownership of the database objects are recorded in the section header,
>> and it informs pg_restore who should be owner of the objects to be
>> restored in this section.
>>
>> Then, pg_restore can generate ALTER xxx OWNER TO after creation, or
>> SET SESSION AUTHORIZATION before creation in runtime.
>> So, we cannot put creation of largeobjects with different ownership
>> in same section.
>>
>> It is the reason why we have to group largeobjects by database user.
>
> Ah, I see.
>
> Then... What happen if we drop or rename roles who have large objects
> during pg_dump? Does the patch still work? It uses pg_get_userbyid().
>
> Regards,
> ---
> Takahiro Itagaki
> NTT Open Source Software Center
>
>
>
--
OSS Platform Development Division, NEC
KaiGai Kohei <[email protected]>
*** a/src/bin/pg_dump/pg_backup_archiver.c
--- b/src/bin/pg_dump/pg_backup_archiver.c
***************
*** 517,527 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
*/
if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
{
- _printTocEntry(AH, te, ropt, true, false);
-
if (strcmp(te->desc, "BLOBS") == 0 ||
! strcmp(te->desc, "BLOB COMMENTS") == 0)
{
ahlog(AH, 1, "restoring %s\n", te->desc);
_selectOutputSchema(AH, "pg_catalog");
--- 517,535 ----
*/
if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0)
{
if (strcmp(te->desc, "BLOBS") == 0 ||
! strcmp(te->desc, "BLOB COMMENTS") == 0 ||
! strcmp(te->desc, "BLOB ACLS") == 0)
{
+ /*
+ * We don't need to dump ACLs, if -a or -x cases.
+ */
+ if (strcmp(te->desc, "BLOB ACLS") == 0 &&
+ (ropt->aclsSkip || ropt->dataOnly))
+ return retval;
+
+ _printTocEntry(AH, te, ropt, true, false);
+
ahlog(AH, 1, "restoring %s\n", te->desc);
_selectOutputSchema(AH, "pg_catalog");
***************
*** 530,535 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te,
--- 538,545 ----
}
else
{
+ _printTocEntry(AH, te, ropt, true, false);
+
_disableTriggersIfNecessary(AH, te, ropt);
/* Select owner and schema as necessary */
*** a/src/bin/pg_dump/pg_backup_custom.c
--- b/src/bin/pg_dump/pg_backup_custom.c
***************
*** 54,60 **** static void _StartBlobs(ArchiveHandle *AH, TocEntry *te);
static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
! static void _LoadBlobs(ArchiveHandle *AH, bool drop);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
--- 54,60 ----
static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid);
static void _EndBlobs(ArchiveHandle *AH, TocEntry *te);
! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _Clone(ArchiveHandle *AH);
static void _DeClone(ArchiveHandle *AH);
***************
*** 498,504 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
break;
case BLK_BLOBS:
! _LoadBlobs(AH, ropt->dropSchema);
break;
default: /* Always have a default */
--- 498,504 ----
break;
case BLK_BLOBS:
! _LoadBlobs(AH, te, ropt);
break;
default: /* Always have a default */
***************
*** 619,625 **** _PrintData(ArchiveHandle *AH)
}
static void
! _LoadBlobs(ArchiveHandle *AH, bool drop)
{
Oid oid;
--- 619,625 ----
}
static void
! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
{
Oid oid;
***************
*** 628,636 **** _LoadBlobs(ArchiveHandle *AH, bool drop)
oid = ReadInt(AH);
while (oid != 0)
{
! StartRestoreBlob(AH, oid, drop);
_PrintData(AH);
EndRestoreBlob(AH, oid);
oid = ReadInt(AH);
}
--- 628,640 ----
oid = ReadInt(AH);
while (oid != 0)
{
! StartRestoreBlob(AH, oid, ropt->dropSchema);
_PrintData(AH);
EndRestoreBlob(AH, oid);
+ if (!ropt->noOwner && !ropt->dataOnly &&
+ !ropt->use_setsessauth && strlen(te->owner) > 0)
+ ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n",
+ oid, te->owner);
oid = ReadInt(AH);
}
*** a/src/bin/pg_dump/pg_backup_files.c
--- b/src/bin/pg_dump/pg_backup_files.c
***************
*** 66,72 **** typedef struct
} lclTocEntry;
static const char *modulename = gettext_noop("file archiver");
! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname);
/*
--- 66,72 ----
} lclTocEntry;
static const char *modulename = gettext_noop("file archiver");
! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname);
/*
***************
*** 330,336 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
return;
if (strcmp(te->desc, "BLOBS") == 0)
! _LoadBlobs(AH, ropt);
else
_PrintFileData(AH, tctx->filename, ropt);
}
--- 330,336 ----
return;
if (strcmp(te->desc, "BLOBS") == 0)
! _LoadBlobs(AH, te, ropt);
else
_PrintFileData(AH, tctx->filename, ropt);
}
***************
*** 365,371 **** _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char fname[K_STD_BUF_SIZE])
}
static void
! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
{
Oid oid;
lclContext *ctx = (lclContext *) AH->formatData;
--- 365,371 ----
}
static void
! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
{
Oid oid;
lclContext *ctx = (lclContext *) AH->formatData;
***************
*** 385,390 **** _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
--- 385,394 ----
StartRestoreBlob(AH, oid, ropt->dropSchema);
_PrintFileData(AH, fname, ropt);
EndRestoreBlob(AH, oid);
+ if (!ropt->noOwner && !ropt->dataOnly &&
+ !ropt->use_setsessauth && strlen(te->owner) > 0)
+ ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n",
+ oid, te->owner);
_getBlobTocEntry(AH, &oid, fname);
}
*** a/src/bin/pg_dump/pg_backup_tar.c
--- b/src/bin/pg_dump/pg_backup_tar.c
***************
*** 100,106 **** typedef struct
static const char *modulename = gettext_noop("tar archiver");
! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt);
static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode);
static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH);
--- 100,106 ----
static const char *modulename = gettext_noop("tar archiver");
! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt);
static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode);
static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH);
***************
*** 696,708 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
}
if (strcmp(te->desc, "BLOBS") == 0)
! _LoadBlobs(AH, ropt);
else
_PrintFileData(AH, tctx->filename, ropt);
}
static void
! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
{
Oid oid;
lclContext *ctx = (lclContext *) AH->formatData;
--- 696,708 ----
}
if (strcmp(te->desc, "BLOBS") == 0)
! _LoadBlobs(AH, te, ropt);
else
_PrintFileData(AH, tctx->filename, ropt);
}
static void
! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt)
{
Oid oid;
lclContext *ctx = (lclContext *) AH->formatData;
***************
*** 733,738 **** _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt)
--- 733,742 ----
ahwrite(buf, 1, cnt, AH);
}
EndRestoreBlob(AH, oid);
+ if (!ropt->noOwner && !ropt->dataOnly &&
+ !ropt->use_setsessauth && strlen(te->owner) > 0)
+ ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n",
+ oid, te->owner);
foundBlob = true;
}
tarClose(AH, th);
*** a/src/bin/pg_dump/pg_dump.c
--- b/src/bin/pg_dump/pg_dump.c
***************
*** 190,198 **** static void selectSourceSchema(const char *schemaName);
static char *getFormattedTypeName(Oid oid, OidOptions opts);
static char *myFormatType(const char *typname, int32 typmod);
static const char *fmtQualifiedId(const char *schema, const char *id);
! static bool hasBlobs(Archive *AH);
static int dumpBlobs(Archive *AH, void *arg);
static int dumpBlobComments(Archive *AH, void *arg);
static void dumpDatabase(Archive *AH);
static void dumpEncoding(Archive *AH);
static void dumpStdStrings(Archive *AH);
--- 190,199 ----
static char *getFormattedTypeName(Oid oid, OidOptions opts);
static char *myFormatType(const char *typname, int32 typmod);
static const char *fmtQualifiedId(const char *schema, const char *id);
! static void getBlobOwners(Archive *AH);
static int dumpBlobs(Archive *AH, void *arg);
static int dumpBlobComments(Archive *AH, void *arg);
+ static int dumpBlobAcls(Archive *AH, void *arg);
static void dumpDatabase(Archive *AH);
static void dumpEncoding(Archive *AH);
static void dumpStdStrings(Archive *AH);
***************
*** 701,725 **** main(int argc, char **argv)
getTableDataFKConstraints();
}
! if (outputBlobs && hasBlobs(g_fout))
! {
! /* Add placeholders to allow correct sorting of blobs */
! DumpableObject *blobobj;
! DumpableObject *blobcobj;
!
! blobobj = (DumpableObject *) malloc(sizeof(DumpableObject));
! blobobj->objType = DO_BLOBS;
! blobobj->catId = nilCatalogId;
! AssignDumpId(blobobj);
! blobobj->name = strdup("BLOBS");
!
! blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject));
! blobcobj->objType = DO_BLOB_COMMENTS;
! blobcobj->catId = nilCatalogId;
! AssignDumpId(blobcobj);
! blobcobj->name = strdup("BLOB COMMENTS");
! addObjectDependency(blobcobj, blobobj->dumpId);
! }
/*
* Collect dependency data to assist in ordering the objects.
--- 702,709 ----
getTableDataFKConstraints();
}
! if (outputBlobs)
! getBlobOwners(g_fout);
/*
* Collect dependency data to assist in ordering the objects.
***************
*** 1938,1972 **** dumpStdStrings(Archive *AH)
/*
! * hasBlobs:
* Test whether database contains any large objects
*/
! static bool
! hasBlobs(Archive *AH)
{
! bool result;
! const char *blobQry;
! PGresult *res;
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
/* Check for BLOB OIDs */
if (AH->remoteVersion >= 80500)
! blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1";
else if (AH->remoteVersion >= 70100)
! blobQry = "SELECT loid FROM pg_largeobject LIMIT 1";
else
! blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1";
! res = PQexec(g_conn, blobQry);
! check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK);
! result = PQntuples(res) > 0;
PQclear(res);
-
- return result;
}
/*
--- 1922,1990 ----
/*
! * getBlobOwners:
* Test whether database contains any large objects
+ * If exist, it adds BlobOwnerInfo objects for each owners
*/
! static void
! getBlobOwners(Archive *AH)
{
! PQExpBuffer blobQry = createPQExpBuffer();
! BlobOwnerInfo *blobobj;
! BlobOwnerInfo *blobcobj;
! BlobOwnerInfo *blobaobj;
! PGresult *res;
! int i;
/* Make sure we are in proper schema */
selectSourceSchema("pg_catalog");
/* Check for BLOB OIDs */
if (AH->remoteVersion >= 80500)
! appendPQExpBuffer(blobQry,
! "SELECT DISTINCT (%s lomowner)"
! " FROM pg_largeobject_metadata",
! username_subquery);
else if (AH->remoteVersion >= 70100)
! appendPQExpBuffer(blobQry,
! "SELECT NULL FROM pg_largeobject LIMIT 1");
else
! appendPQExpBuffer(blobQry,
! "SELECT NULL FROM pg_class WHERE relkind = 'l' LIMIT 1");
! res = PQexec(g_conn, blobQry->data);
! check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK);
! for (i = 0; i < PQntuples(res); i++)
! {
! blobobj = (BlobOwnerInfo *) malloc(sizeof(BlobOwnerInfo));
! blobobj->dobj.objType = DO_BLOBS;
! blobobj->dobj.catId = nilCatalogId;
! AssignDumpId(&blobobj->dobj);
! blobobj->dobj.name = strdup("BLOBS");
! blobobj->rolname = strdup(PQgetvalue(res, i, 0));
!
! blobcobj = (BlobOwnerInfo *) malloc(sizeof(BlobOwnerInfo));
! blobcobj->dobj.objType = DO_BLOB_COMMENTS;
! blobcobj->dobj.catId = nilCatalogId;
! AssignDumpId(&blobcobj->dobj);
! blobcobj->dobj.name = strdup("BLOB COMMENTS");
! blobcobj->rolname = strdup(PQgetvalue(res, i, 0));
! addObjectDependency(&blobcobj->dobj, blobobj->dobj.dumpId);
!
! if (AH->remoteVersion >= 80500 && !dataOnly && !aclsSkip)
! {
! blobaobj = (BlobOwnerInfo *) malloc(sizeof(BlobOwnerInfo));
! blobaobj->dobj.objType = DO_BLOB_ACLS;
! blobaobj->dobj.catId = nilCatalogId;
! AssignDumpId(&blobaobj->dobj);
! blobaobj->dobj.name = strdup("BLOB ACLS");
! blobaobj->rolname = strdup(PQgetvalue(res, i, 0));
! addObjectDependency(&blobaobj->dobj, blobobj->dobj.dumpId);
! }
! }
PQclear(res);
}
/*
***************
*** 1976,1983 **** hasBlobs(Archive *AH)
static int
dumpBlobs(Archive *AH, void *arg)
{
! const char *blobQry;
! const char *blobFetchQry;
PGresult *res;
char buf[LOBBUFSIZE];
int i;
--- 1994,2003 ----
static int
dumpBlobs(Archive *AH, void *arg)
{
! char *rolname = ((BlobOwnerInfo *)arg)->rolname;
! PQExpBuffer blobQry = createPQExpBuffer();
! const char *blobFetchQry = "FETCH 1000 IN bloboid";
! const char *blobCloseQry = "CLOSE bloboid";
PGresult *res;
char buf[LOBBUFSIZE];
int i;
***************
*** 1991,2007 **** dumpBlobs(Archive *AH, void *arg)
/* Cursor to get all BLOB OIDs */
if (AH->remoteVersion >= 80500)
! blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata";
else if (AH->remoteVersion >= 70100)
! blobQry = "DECLARE bloboid CURSOR FOR SELECT DISTINCT loid FROM pg_largeobject";
else
! blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_class WHERE relkind = 'l'";
! res = PQexec(g_conn, blobQry);
! check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
!
! /* Command to fetch from cursor */
! blobFetchQry = "FETCH 1000 IN bloboid";
do
{
--- 2011,2029 ----
/* Cursor to get all BLOB OIDs */
if (AH->remoteVersion >= 80500)
! appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR "
! "SELECT oid, lomacl FROM pg_largeobject_metadata "
! "WHERE '%s' in (%s lomowner)",
! rolname, username_subquery);
else if (AH->remoteVersion >= 70100)
! appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR "
! "SELECT DISTINCT loid, NULL FROM pg_largeobject");
else
! appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR "
! "SELECT oid, NULL FROM pg_class WHERE relkind = 'l'");
! res = PQexec(g_conn, blobQry->data);
! check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK);
do
{
***************
*** 2051,2064 **** dumpBlobs(Archive *AH, void *arg)
PQclear(res);
return 1;
}
/*
* dumpBlobComments
! * dump all blob properties.
! * It has "BLOB COMMENTS" tag due to the historical reason, but note
! * that it is the routine to dump all the properties of blobs.
*
* Since we don't provide any way to be selective about dumping blobs,
* there's no need to be selective about their comments either. We put
--- 2073,2088 ----
PQclear(res);
+ /* Cleanup cursor */
+ res = PQexec(g_conn, blobCloseQry);
+ check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK);
+
return 1;
}
/*
* dumpBlobComments
! * dump all blob comments.
*
* Since we don't provide any way to be selective about dumping blobs,
* there's no need to be selective about their comments either. We put
***************
*** 2067,2075 **** dumpBlobs(Archive *AH, void *arg)
static int
dumpBlobComments(Archive *AH, void *arg)
{
! const char *blobQry;
! const char *blobFetchQry;
PQExpBuffer cmdQry = createPQExpBuffer();
PGresult *res;
int i;
--- 2091,2101 ----
static int
dumpBlobComments(Archive *AH, void *arg)
{
! char *rolname = ((BlobOwnerInfo *)arg)->rolname;
PQExpBuffer cmdQry = createPQExpBuffer();
+ PQExpBuffer blobQry = createPQExpBuffer();
+ const char *blobFetchQry = "FETCH 100 IN blobcmt";
+ const char *blobCloseQry = "CLOSE blobcmt";
PGresult *res;
int i;
***************
*** 2081,2118 **** dumpBlobComments(Archive *AH, void *arg)
/* Cursor to get all BLOB comments */
if (AH->remoteVersion >= 80500)
! blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
! "obj_description(oid, 'pg_largeobject'), "
! "pg_get_userbyid(lomowner), lomacl "
! "FROM pg_largeobject_metadata";
else if (AH->remoteVersion >= 70300)
! blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
! "obj_description(loid, 'pg_largeobject'), NULL, NULL "
! "FROM (SELECT DISTINCT loid FROM "
! "pg_description d JOIN pg_largeobject l ON (objoid = loid) "
! "WHERE classoid = 'pg_largeobject'::regclass) ss";
else if (AH->remoteVersion >= 70200)
! blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
! "obj_description(loid, 'pg_largeobject'), NULL, NULL "
! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
else if (AH->remoteVersion >= 70100)
! blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, "
! "obj_description(loid), NULL, NULL "
! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss";
else
! blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, "
! " ( "
! " SELECT description "
! " FROM pg_description pd "
! " WHERE pd.objoid=pc.oid "
! " ), NULL, NULL "
! "FROM pg_class pc WHERE relkind = 'l'";
!
! res = PQexec(g_conn, blobQry);
! check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK);
!
! /* Command to fetch from cursor */
! blobFetchQry = "FETCH 100 IN blobcmt";
do
{
--- 2107,2147 ----
/* Cursor to get all BLOB comments */
if (AH->remoteVersion >= 80500)
! appendPQExpBuffer(blobQry,
! "DECLARE blobcmt CURSOR FOR SELECT oid, "
! "obj_description(oid, 'pg_largeobject') "
! "FROM pg_largeobject_metadata "
! "WHERE '%s' in (%s lomowner)",
! rolname, username_subquery);
else if (AH->remoteVersion >= 70300)
! appendPQExpBuffer(blobQry,
! "DECLARE blobcmt CURSOR FOR SELECT loid, "
! "obj_description(loid, 'pg_largeobject') "
! "FROM (SELECT DISTINCT loid FROM "
! "pg_description d JOIN pg_largeobject l ON (objoid = loid) "
! "WHERE classoid = 'pg_largeobject'::regclass) ss");
else if (AH->remoteVersion >= 70200)
! appendPQExpBuffer(blobQry,
! "DECLARE blobcmt CURSOR FOR SELECT loid, "
! "obj_description(loid, 'pg_largeobject') "
! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss");
else if (AH->remoteVersion >= 70100)
! appendPQExpBuffer(blobQry,
! "DECLARE blobcmt CURSOR FOR SELECT loid, "
! "obj_description(loid) "
! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss");
else
! appendPQExpBuffer(blobQry,
! "DECLARE blobcmt CURSOR FOR SELECT oid, "
! " ( "
! " SELECT description "
! " FROM pg_description pd "
! " WHERE pd.objoid=pc.oid "
! " ) "
! "FROM pg_class pc WHERE relkind = 'l'");
!
! res = PQexec(g_conn, blobQry->data);
! check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK);
do
{
***************
*** 2127,2175 **** dumpBlobComments(Archive *AH, void *arg)
{
Oid blobOid = atooid(PQgetvalue(res, i, 0));
char *lo_comment = PQgetvalue(res, i, 1);
! char *lo_owner = PQgetvalue(res, i, 2);
! char *lo_acl = PQgetvalue(res, i, 3);
! char lo_name[32];
resetPQExpBuffer(cmdQry);
! /* comment on the blob */
! if (!PQgetisnull(res, i, 1))
! {
! appendPQExpBuffer(cmdQry,
! "COMMENT ON LARGE OBJECT %u IS ", blobOid);
! appendStringLiteralAH(cmdQry, lo_comment, AH);
! appendPQExpBuffer(cmdQry, ";\n");
! }
! /* dump blob ownership, if necessary */
! if (!PQgetisnull(res, i, 2))
! {
! appendPQExpBuffer(cmdQry,
! "ALTER LARGE OBJECT %u OWNER TO %s;\n",
! blobOid, lo_owner);
! }
! /* dump blob privileges, if necessary */
! if (!PQgetisnull(res, i, 3) &&
! !dataOnly && !aclsSkip)
! {
! snprintf(lo_name, sizeof(lo_name), "%u", blobOid);
! if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT",
! lo_acl, lo_owner, "",
! AH->remoteVersion, cmdQry))
! {
! write_msg(NULL, "could not parse ACL (%s) for "
! "large object %u", lo_acl, blobOid);
! exit_nicely();
! }
! }
! if (cmdQry->len > 0)
{
! appendPQExpBuffer(cmdQry, "\n");
! archputs(cmdQry->data, AH);
}
}
} while (PQntuples(res) > 0);
--- 2156,2256 ----
{
Oid blobOid = atooid(PQgetvalue(res, i, 0));
char *lo_comment = PQgetvalue(res, i, 1);
!
! /* comment on the blob, if necessary */
! if (PQgetisnull(res, i, 1))
! continue;
resetPQExpBuffer(cmdQry);
! appendPQExpBuffer(cmdQry,
! "COMMENT ON LARGE OBJECT %u IS ", blobOid);
! appendStringLiteralAH(cmdQry, lo_comment, AH);
! appendPQExpBuffer(cmdQry, ";\n\n");
! archputs(cmdQry->data, AH);
! }
! } while (PQntuples(res) > 0);
! PQclear(res);
!
! archputs("\n", AH);
!
! destroyPQExpBuffer(cmdQry);
!
! /* Cleanup cursor */
! res = PQexec(g_conn, blobCloseQry);
! check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK);
!
! return 1;
! }
!
! /*
! * dumpBlobAcls
! * dump all blob privileges.
! *
! * Since we don't provide any way to be selective about dumping blobs,
! * there's no need to be selective about their privileges either.
! * We put all the privileges into one big TOC entry.
! */
! static int
! dumpBlobAcls(Archive *AH, void *arg)
! {
! char *rolname = ((BlobOwnerInfo *)arg)->rolname;
! PQExpBuffer cmdQry = createPQExpBuffer();
! PQExpBuffer blobQry = createPQExpBuffer();
! const char *blobFetchQry = "FETCH 100 IN blobacl";
! const char *blobCloseQry = "CLOSE blobacl";
! PGresult *res;
! int i;
!
! if (g_verbose)
! write_msg(NULL, "saving large object permissions\n");
!
! /* Cursor to get all BLOB permissions */
! if (AH->remoteVersion >= 80500)
! appendPQExpBuffer(blobQry, "DECLARE blobacl CURSOR FOR "
! "SELECT oid, lomacl FROM pg_largeobject_metadata "
! "WHERE '%s' in (%s lomowner)",
! rolname, username_subquery);
! else
! return 1; /* no need to dump anything < 8.5 */
!
! /* Make sure we are in proper schema */
! selectSourceSchema("pg_catalog");
!
! /* Open cursor */
! res = PQexec(g_conn, blobQry->data);
! check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK);
!
! do
! {
! PQclear(res);
!
! /* Do a fetch */
! res = PQexec(g_conn, blobFetchQry);
! check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK);
!
! /* Process the tuples, if any */
! for (i = 0; i < PQntuples(res); i++)
! {
! char *lo_oid = PQgetvalue(res, i, 0);
! char *lo_acl = PQgetvalue(res, i, 1);
! if (PQgetisnull(res, i, 1))
! continue;
!
! resetPQExpBuffer(cmdQry);
!
! if (!buildACLCommands(lo_oid, NULL, "LARGE OBJECT",
! lo_acl, rolname, "",
! AH->remoteVersion, cmdQry))
{
! write_msg(NULL, "could not parse ACL (%s) for "
! "large object %s", lo_acl, lo_oid);
! exit_nicely();
}
+ archprintf(AH, "%s\n", cmdQry->data);
}
} while (PQntuples(res) > 0);
***************
*** 2179,2184 **** dumpBlobComments(Archive *AH, void *arg)
--- 2260,2269 ----
destroyPQExpBuffer(cmdQry);
+ /* Cleanup */
+ res = PQexec(g_conn, blobCloseQry);
+ check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK);
+
return 1;
}
***************
*** 6480,6498 **** dumpDumpableObject(Archive *fout, DumpableObject *dobj)
break;
case DO_BLOBS:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! dobj->name, NULL, NULL, "",
false, "BLOBS", SECTION_DATA,
"", "", NULL,
dobj->dependencies, dobj->nDeps,
! dumpBlobs, NULL);
break;
case DO_BLOB_COMMENTS:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! dobj->name, NULL, NULL, "",
false, "BLOB COMMENTS", SECTION_DATA,
"", "", NULL,
dobj->dependencies, dobj->nDeps,
! dumpBlobComments, NULL);
break;
}
}
--- 6565,6594 ----
break;
case DO_BLOBS:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! dobj->name, NULL, NULL,
! ((BlobOwnerInfo *) dobj)->rolname,
false, "BLOBS", SECTION_DATA,
"", "", NULL,
dobj->dependencies, dobj->nDeps,
! dumpBlobs, dobj);
break;
case DO_BLOB_COMMENTS:
ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! dobj->name, NULL, NULL,
! ((BlobOwnerInfo *) dobj)->rolname,
false, "BLOB COMMENTS", SECTION_DATA,
"", "", NULL,
dobj->dependencies, dobj->nDeps,
! dumpBlobComments, dobj);
! break;
! case DO_BLOB_ACLS:
! ArchiveEntry(fout, dobj->catId, dobj->dumpId,
! dobj->name, NULL, NULL,
! ((BlobOwnerInfo *) dobj)->rolname,
! false, "BLOB ACLS", SECTION_DATA,
! "", "", NULL,
! dobj->dependencies, dobj->nDeps,
! dumpBlobAcls, dobj);
break;
}
}
*** a/src/bin/pg_dump/pg_dump.h
--- b/src/bin/pg_dump/pg_dump.h
***************
*** 116,122 **** typedef enum
DO_FOREIGN_SERVER,
DO_DEFAULT_ACL,
DO_BLOBS,
! DO_BLOB_COMMENTS
} DumpableObjectType;
typedef struct _dumpableObject
--- 116,123 ----
DO_FOREIGN_SERVER,
DO_DEFAULT_ACL,
DO_BLOBS,
! DO_BLOB_COMMENTS,
! DO_BLOB_ACLS,
} DumpableObjectType;
typedef struct _dumpableObject
***************
*** 442,447 **** typedef struct _defaultACLInfo
--- 443,454 ----
char *defaclacl;
} DefaultACLInfo;
+ typedef struct _blobOwnerInfo
+ {
+ DumpableObject dobj;
+ char *rolname;
+ } BlobOwnerInfo;
+
/* global decls */
extern bool force_quotes; /* double-quotes for identifiers flag */
extern bool g_verbose; /* verbose flag */
*** a/src/bin/pg_dump/pg_dump_sort.c
--- b/src/bin/pg_dump/pg_dump_sort.c
***************
*** 93,99 **** static const int newObjectTypePriority[] =
15, /* DO_FOREIGN_SERVER */
27, /* DO_DEFAULT_ACL */
20, /* DO_BLOBS */
! 21 /* DO_BLOB_COMMENTS */
};
--- 93,100 ----
15, /* DO_FOREIGN_SERVER */
27, /* DO_DEFAULT_ACL */
20, /* DO_BLOBS */
! 21, /* DO_BLOB_COMMENTS */
! 28, /* DO_BLOB_ACLS */
};
***************
*** 1156,1161 **** describeDumpableObject(DumpableObject *obj, char *buf, int bufsize)
--- 1157,1167 ----
"BLOB COMMENTS (ID %d)",
obj->dumpId);
return;
+ case DO_BLOB_ACLS:
+ snprintf(buf, bufsize,
+ "BLOB ACLS (ID %d)",
+ obj->dumpId);
+ return;
}
/* shouldn't get here */
snprintf(buf, bufsize,
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers