(2009/12/21 9:39), KaiGai Kohei wrote: > (2009/12/19 12:05), Robert Haas wrote: >> On Fri, Dec 18, 2009 at 9:48 PM, Tom Lane<t...@sss.pgh.pa.us> wrote: >>> Robert Haas<robertmh...@gmail.com> writes: >>>> Oh. This is more complicated than it appeared on the surface. It >>>> seems that the string "BLOB COMMENTS" actually gets inserted into >>>> custom dumps somewhere, so I'm not sure whether we can just change it. >>>> Was this issue discussed at some point before this was committed? >>>> Changing it would seem to require inserting some backward >>>> compatibility code here. Another option would be to add a separate >>>> section for "BLOB METADATA", and leave "BLOB COMMENTS" alone. Can >>>> anyone comment on what the Right Thing To Do is here? >>> >>> The BLOB COMMENTS label is, or was, correct for what it contained. >>> If this patch has usurped it to contain other things >> >> It has. >> >>> I would argue >>> that that is seriously wrong. pg_dump already has a clear notion >>> of how to handle ACLs for objects. ACLs for blobs ought to be >>> made to fit into that structure, not dumped in some random place >>> because that saved a few lines of code. >> >> OK. Hopefully KaiGai or Takahiro can suggest a fix.
The patch has grown larger than I expected before, because the way to handle large objects are far from any other object classes. Here are three points: 1) The new BLOB ACLS section was added. It is a single purpose section to describe GRANT/REVOKE statements on large objects, and BLOB COMMENTS section was reverted to put only descriptions. Because we need to assume a case when the database holds massive number of large objects, it is not reasonable to store them using dumpACL(). It chains an ACL entry with the list of TOC entries, then, these are dumped. It means pg_dump may have to register massive number of large objects in the local memory space. Currently, we also store GRANT/REVOKE statements in BLOB COMMENTS section, but confusable. Even if pg_restore is launched with --no-privileges options, it cannot ignore GRANT/REVOKE statements on large objects. This fix enables to distinguish ACLs on large objects from other properties, and to handle them correctly. 2) The BLOBS section was separated for each database users. Currently, the BLOBS section does not have information about owner of the large objects to be restored. So, we tried to alter its ownership in the BLOB COMMENTS section, but incorrect. The --use-set-session-authorization option requires to restore ownership of objects without ALTER ... OWNER TO statements. So, we need to set up correct database username on the section properties. This patch renamed the hasBlobs() by getBlobs(), and changed its purpose. It registers DO_BLOBS, DO_BLOB_COMMENTS and DO_BLOB_ACLS for each large objects owners, if necessary. For example, if here are five users owning large objects, getBlobs() shall register five TOC entries for each users, and dumpBlobs(), dumpBlobComments() and dumpBlobAcls() shall be also invoked five times with the username. 3) _LoadBlobs() For regular database object classes, _printTocEntry() can inject "ALTER xxx OWNER TO ..." statement on the restored object based on the ownership described in the section header. However, we cannot use this infrastructure for large objects as-is, because one BLOBS section can restore multiple large objects. _LoadBlobs() is a routine to restore large objects within a certain section. This patch modifies this routine to inject "ALTER LARGE OBJECT <loid> OWNER TO <user>" statement for each large objects based on the ownership of the section. (if --use-set-session-authorization is not given.) $ diffstat pgsql-fix-pg_dump-blob-privs.patch pg_backup_archiver.c | 4 pg_backup_custom.c | 11 ! pg_backup_files.c | 9 ! pg_backup_tar.c | 9 ! pg_dump.c | 312 +++++++----!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! pg_dump.h | 9 ! pg_dump_sort.c | 8 ! 7 files changed, 68 insertions(+), 25 deletions(-), 269 modifications(!) Thanks, -- OSS Platform Development Division, NEC KaiGai Kohei <kai...@ak.jp.nec.com>
*** base/src/bin/pg_dump/pg_dump.h (revision 2512) --- base/src/bin/pg_dump/pg_dump.h (working copy) *************** *** 116,122 **** DO_FOREIGN_SERVER, DO_DEFAULT_ACL, DO_BLOBS, ! DO_BLOB_COMMENTS } DumpableObjectType; typedef struct _dumpableObject --- 116,123 ---- DO_FOREIGN_SERVER, DO_DEFAULT_ACL, DO_BLOBS, ! DO_BLOB_COMMENTS, ! DO_BLOB_ACLS, } DumpableObjectType; typedef struct _dumpableObject *************** *** 442,447 **** --- 443,454 ---- char *defaclacl; } DefaultACLInfo; + typedef struct _blobsInfo + { + DumpableObject dobj; + char *rolname; + } BlobsInfo; + /* global decls */ extern bool force_quotes; /* double-quotes for identifiers flag */ extern bool g_verbose; /* verbose flag */ *** base/src/bin/pg_dump/pg_backup_tar.c (revision 2512) --- base/src/bin/pg_dump/pg_backup_tar.c (working copy) *************** *** 104,110 **** static const char *modulename = gettext_noop("tar archiver"); ! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt); static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode); static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH); --- 104,110 ---- static const char *modulename = gettext_noop("tar archiver"); ! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode); static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH); *************** *** 700,712 **** } if (strcmp(te->desc, "BLOBS") == 0) ! _LoadBlobs(AH, ropt); else _PrintFileData(AH, tctx->filename, ropt); } static void ! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; --- 700,712 ---- } if (strcmp(te->desc, "BLOBS") == 0) ! _LoadBlobs(AH, te, ropt); else _PrintFileData(AH, tctx->filename, ropt); } static void ! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; *************** *** 737,742 **** --- 737,745 ---- ahwrite(buf, 1, cnt, AH); } EndRestoreBlob(AH, oid); + if (!ropt->use_setsessauth) + ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n", + oid, te->owner); foundBlob = true; } tarClose(AH, th); *** base/src/bin/pg_dump/pg_dump_sort.c (revision 2512) --- base/src/bin/pg_dump/pg_dump_sort.c (working copy) *************** *** 93,99 **** 15, /* DO_FOREIGN_SERVER */ 27, /* DO_DEFAULT_ACL */ 20, /* DO_BLOBS */ ! 21 /* DO_BLOB_COMMENTS */ }; --- 93,100 ---- 15, /* DO_FOREIGN_SERVER */ 27, /* DO_DEFAULT_ACL */ 20, /* DO_BLOBS */ ! 21, /* DO_BLOB_COMMENTS */ ! 28, /* DO_BLOB_ACLS */ }; *************** *** 1156,1161 **** --- 1157,1167 ---- "BLOB COMMENTS (ID %d)", obj->dumpId); return; + case DO_BLOB_ACLS: + snprintf(buf, bufsize, + "BLOB ACLS (ID %d)", + obj->dumpId); + return; } /* shouldn't get here */ snprintf(buf, bufsize, *** base/src/bin/pg_dump/pg_backup_files.c (revision 2512) --- base/src/bin/pg_dump/pg_backup_files.c (working copy) *************** *** 66,72 **** } lclTocEntry; static const char *modulename = gettext_noop("file archiver"); ! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt); static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname); /* --- 66,72 ---- } lclTocEntry; static const char *modulename = gettext_noop("file archiver"); ! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname); /* *************** *** 330,336 **** return; if (strcmp(te->desc, "BLOBS") == 0) ! _LoadBlobs(AH, ropt); else _PrintFileData(AH, tctx->filename, ropt); } --- 330,336 ---- return; if (strcmp(te->desc, "BLOBS") == 0) ! _LoadBlobs(AH, te, ropt); else _PrintFileData(AH, tctx->filename, ropt); } *************** *** 365,371 **** } static void ! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; --- 365,371 ---- } static void ! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; *************** *** 385,390 **** --- 385,393 ---- StartRestoreBlob(AH, oid, ropt->dropSchema); _PrintFileData(AH, fname, ropt); EndRestoreBlob(AH, oid); + if (!ropt->use_setsessauth) + ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n", + oid, te->owner); _getBlobTocEntry(AH, &oid, fname); } *** base/src/bin/pg_dump/pg_backup_archiver.c (revision 2512) --- base/src/bin/pg_dump/pg_backup_archiver.c (working copy) *************** *** 519,525 **** _printTocEntry(AH, te, ropt, true, false); if (strcmp(te->desc, "BLOBS") == 0 || ! strcmp(te->desc, "BLOB COMMENTS") == 0) { ahlog(AH, 1, "restoring %s\n", te->desc); --- 519,527 ---- _printTocEntry(AH, te, ropt, true, false); if (strcmp(te->desc, "BLOBS") == 0 || ! strcmp(te->desc, "BLOB COMMENTS") == 0 || ! (!ropt->aclsSkip && ! strcmp(te->desc, "BLOB ACLS") == 0)) { ahlog(AH, 1, "restoring %s\n", te->desc); *** base/src/bin/pg_dump/pg_backup_custom.c (revision 2512) --- base/src/bin/pg_dump/pg_backup_custom.c (working copy) *************** *** 54,60 **** static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); ! static void _LoadBlobs(ArchiveHandle *AH, bool drop); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); --- 54,60 ---- static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); ! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); *************** *** 498,504 **** break; case BLK_BLOBS: ! _LoadBlobs(AH, ropt->dropSchema); break; default: /* Always have a default */ --- 498,504 ---- break; case BLK_BLOBS: ! _LoadBlobs(AH, te, ropt); break; default: /* Always have a default */ *************** *** 619,625 **** } static void ! _LoadBlobs(ArchiveHandle *AH, bool drop) { Oid oid; --- 619,625 ---- } static void ! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) { Oid oid; *************** *** 628,636 **** oid = ReadInt(AH); while (oid != 0) { ! StartRestoreBlob(AH, oid, drop); _PrintData(AH); EndRestoreBlob(AH, oid); oid = ReadInt(AH); } --- 628,639 ---- oid = ReadInt(AH); while (oid != 0) { ! StartRestoreBlob(AH, oid, ropt->dropSchema); _PrintData(AH); EndRestoreBlob(AH, oid); + if (!ropt->use_setsessauth) + ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n", + oid, te->owner); oid = ReadInt(AH); } *** base/src/bin/pg_dump/pg_dump.c (revision 2513) --- base/src/bin/pg_dump/pg_dump.c (working copy) *************** *** 190,198 **** static char *getFormattedTypeName(Oid oid, OidOptions opts); static char *myFormatType(const char *typname, int32 typmod); static const char *fmtQualifiedId(const char *schema, const char *id); ! static bool hasBlobs(Archive *AH); static int dumpBlobs(Archive *AH, void *arg); static int dumpBlobComments(Archive *AH, void *arg); static void dumpDatabase(Archive *AH); static void dumpEncoding(Archive *AH); static void dumpStdStrings(Archive *AH); --- 190,199 ---- static char *getFormattedTypeName(Oid oid, OidOptions opts); static char *myFormatType(const char *typname, int32 typmod); static const char *fmtQualifiedId(const char *schema, const char *id); ! static void getBlobs(Archive *AH); static int dumpBlobs(Archive *AH, void *arg); static int dumpBlobComments(Archive *AH, void *arg); + static int dumpBlobAcls(Archive *AH, void *arg); static void dumpDatabase(Archive *AH); static void dumpEncoding(Archive *AH); static void dumpStdStrings(Archive *AH); *************** *** 695,720 **** getTableDataFKConstraints(); } ! if (outputBlobs && hasBlobs(g_fout)) ! { ! /* Add placeholders to allow correct sorting of blobs */ ! DumpableObject *blobobj; ! DumpableObject *blobcobj; - blobobj = (DumpableObject *) malloc(sizeof(DumpableObject)); - blobobj->objType = DO_BLOBS; - blobobj->catId = nilCatalogId; - AssignDumpId(blobobj); - blobobj->name = strdup("BLOBS"); - - blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject)); - blobcobj->objType = DO_BLOB_COMMENTS; - blobcobj->catId = nilCatalogId; - AssignDumpId(blobcobj); - blobcobj->name = strdup("BLOB COMMENTS"); - addObjectDependency(blobcobj, blobobj->dumpId); - } - /* * Collect dependency data to assist in ordering the objects. */ --- 696,704 ---- getTableDataFKConstraints(); } ! if (outputBlobs) ! getBlobs(g_fout); /* * Collect dependency data to assist in ordering the objects. */ *************** *** 1932,1966 **** /* ! * hasBlobs: * Test whether database contains any large objects */ ! static bool ! hasBlobs(Archive *AH) { ! bool result; const char *blobQry; PGresult *res; /* Make sure we are in proper schema */ selectSourceSchema("pg_catalog"); /* Check for BLOB OIDs */ if (AH->remoteVersion >= 80500) ! blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1"; else if (AH->remoteVersion >= 70100) ! blobQry = "SELECT loid FROM pg_largeobject LIMIT 1"; else ! blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1"; res = PQexec(g_conn, blobQry); check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK); ! result = PQntuples(res) > 0; PQclear(res); - - return result; } /* --- 1916,1980 ---- /* ! * getBlobs: * Test whether database contains any large objects + * If exist, it adds BlobsInfo objects for each owners */ ! static void ! getBlobs(Archive *AH) { ! BlobsInfo *blobobj; ! BlobsInfo *blobcobj; ! BlobsInfo *blobaobj; const char *blobQry; PGresult *res; + int i; /* Make sure we are in proper schema */ selectSourceSchema("pg_catalog"); /* Check for BLOB OIDs */ if (AH->remoteVersion >= 80500) ! blobQry = "SELECT DISTINCT pg_get_userbyid(lomowner)" ! " FROM pg_largeobject_metadata"; else if (AH->remoteVersion >= 70100) ! blobQry = "SELECT NULL FROM pg_largeobject LIMIT 1"; else ! blobQry = "SELECT NULL FROM pg_class WHERE relkind = 'l' LIMIT 1"; res = PQexec(g_conn, blobQry); check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK); ! for (i = 0; i < PQntuples(res); i++) ! { ! blobobj = (BlobsInfo *) malloc(sizeof(BlobsInfo)); ! blobobj->dobj.objType = DO_BLOBS; ! blobobj->dobj.catId = nilCatalogId; ! AssignDumpId(&blobobj->dobj); ! blobobj->dobj.name = strdup("BLOBS"); ! blobobj->rolname = strdup(PQgetvalue(res, i, 0)); + blobcobj = (BlobsInfo *) malloc(sizeof(BlobsInfo)); + blobcobj->dobj.objType = DO_BLOB_COMMENTS; + blobcobj->dobj.catId = nilCatalogId; + AssignDumpId(&blobcobj->dobj); + blobcobj->dobj.name = strdup("BLOB COMMENTS"); + blobcobj->rolname = strdup(PQgetvalue(res, i, 0)); + addObjectDependency(&blobcobj->dobj, blobobj->dobj.dumpId); + + if (AH->remoteVersion < 80500 || dataOnly || aclsSkip) + continue; + + blobaobj = (BlobsInfo *) malloc(sizeof(BlobsInfo)); + blobaobj->dobj.objType = DO_BLOB_ACLS; + blobaobj->dobj.catId = nilCatalogId; + AssignDumpId(&blobaobj->dobj); + blobaobj->dobj.name = strdup("BLOB ACLS"); + blobaobj->rolname = strdup(PQgetvalue(res, i, 0)); + addObjectDependency(&blobaobj->dobj, blobobj->dobj.dumpId); + } + PQclear(res); } /* *************** *** 1970,1977 **** static int dumpBlobs(Archive *AH, void *arg) { ! const char *blobQry; ! const char *blobFetchQry; PGresult *res; char buf[LOBBUFSIZE]; int i; --- 1984,1993 ---- static int dumpBlobs(Archive *AH, void *arg) { ! BlobsInfo *binfo = (BlobsInfo *)arg; ! PQExpBuffer blobQry = createPQExpBuffer(); ! const char *blobFetchQry = "FETCH 1000 IN bloboid"; ! const char *blobCloseQry = "CLOSE bloboid"; PGresult *res; char buf[LOBBUFSIZE]; int i; *************** *** 1985,2002 **** /* Cursor to get all BLOB OIDs */ if (AH->remoteVersion >= 80500) ! blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata"; else if (AH->remoteVersion >= 70100) ! blobQry = "DECLARE bloboid CURSOR FOR SELECT DISTINCT loid FROM pg_largeobject"; else ! blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_class WHERE relkind = 'l'"; ! res = PQexec(g_conn, blobQry); ! check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK); - /* Command to fetch from cursor */ - blobFetchQry = "FETCH 1000 IN bloboid"; - do { PQclear(res); --- 2001,2020 ---- /* Cursor to get all BLOB OIDs */ if (AH->remoteVersion >= 80500) ! appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR " ! "SELECT oid, lomacl FROM pg_largeobject_metadata " ! "WHERE pg_get_userbyid(lomowner) = '%s'", ! binfo->rolname); else if (AH->remoteVersion >= 70100) ! appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR " ! "SELECT DISTINCT loid, NULL FROM pg_largeobject"); else ! appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR " ! "SELECT oid, NULL FROM pg_class WHERE relkind = 'l'"); ! res = PQexec(g_conn, blobQry->data); ! check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK); do { PQclear(res); *************** *** 2045,2058 **** PQclear(res); return 1; } /* * dumpBlobComments ! * dump all blob properties. ! * It has "BLOB COMMENTS" tag due to the historical reason, but note ! * that it is the routine to dump all the properties of blobs. * * Since we don't provide any way to be selective about dumping blobs, * there's no need to be selective about their comments either. We put --- 2063,2078 ---- PQclear(res); + /* Cleanup cursor */ + res = PQexec(g_conn, blobCloseQry); + check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK); + return 1; } /* * dumpBlobComments ! * dump all blob comments. * * Since we don't provide any way to be selective about dumping blobs, * there's no need to be selective about their comments either. We put *************** *** 2061,2069 **** static int dumpBlobComments(Archive *AH, void *arg) { ! const char *blobQry; ! const char *blobFetchQry; PQExpBuffer cmdQry = createPQExpBuffer(); PGresult *res; int i; --- 2081,2091 ---- static int dumpBlobComments(Archive *AH, void *arg) { ! char *rolname = ((BlobsInfo *)arg)->rolname; PQExpBuffer cmdQry = createPQExpBuffer(); + PQExpBuffer blobQry = createPQExpBuffer(); + const char *blobFetchQry = "FETCH 100 IN blobcmt"; + const char *blobCloseQry = "CLOSE blobcmt"; PGresult *res; int i; *************** *** 2075,2113 **** /* Cursor to get all BLOB comments */ if (AH->remoteVersion >= 80500) ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, " ! "obj_description(oid, 'pg_largeobject'), " ! "pg_get_userbyid(lomowner), lomacl " ! "FROM pg_largeobject_metadata"; else if (AH->remoteVersion >= 70300) ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid, 'pg_largeobject'), NULL, NULL " ! "FROM (SELECT DISTINCT loid FROM " ! "pg_description d JOIN pg_largeobject l ON (objoid = loid) " ! "WHERE classoid = 'pg_largeobject'::regclass) ss"; else if (AH->remoteVersion >= 70200) ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid, 'pg_largeobject'), NULL, NULL " ! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"; else if (AH->remoteVersion >= 70100) ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid), NULL, NULL " ! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"; else ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, " ! " ( " ! " SELECT description " ! " FROM pg_description pd " ! " WHERE pd.objoid=pc.oid " ! " ), NULL, NULL " ! "FROM pg_class pc WHERE relkind = 'l'"; ! res = PQexec(g_conn, blobQry); ! check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK); - /* Command to fetch from cursor */ - blobFetchQry = "FETCH 100 IN blobcmt"; - do { PQclear(res); --- 2097,2137 ---- /* Cursor to get all BLOB comments */ if (AH->remoteVersion >= 80500) ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT oid, " ! "obj_description(oid, 'pg_largeobject') " ! "FROM pg_largeobject_metadata " ! "WHERE pg_get_userbyid(lomowner) = '%s'", rolname); else if (AH->remoteVersion >= 70300) ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid, 'pg_largeobject') " ! "FROM (SELECT DISTINCT loid FROM " ! "pg_description d JOIN pg_largeobject l ON (objoid = loid) " ! "WHERE classoid = 'pg_largeobject'::regclass) ss"); else if (AH->remoteVersion >= 70200) ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid, 'pg_largeobject') " ! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"); else if (AH->remoteVersion >= 70100) ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid) " ! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"); else ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT oid, " ! " ( " ! " SELECT description " ! " FROM pg_description pd " ! " WHERE pd.objoid=pc.oid " ! " ) " ! "FROM pg_class pc WHERE relkind = 'l'"); ! res = PQexec(g_conn, blobQry->data); ! check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK); do { PQclear(res); *************** *** 2121,2169 **** { Oid blobOid = atooid(PQgetvalue(res, i, 0)); char *lo_comment = PQgetvalue(res, i, 1); - char *lo_owner = PQgetvalue(res, i, 2); - char *lo_acl = PQgetvalue(res, i, 3); - char lo_name[32]; resetPQExpBuffer(cmdQry); ! /* comment on the blob */ ! if (!PQgetisnull(res, i, 1)) ! { ! appendPQExpBuffer(cmdQry, ! "COMMENT ON LARGE OBJECT %u IS ", blobOid); ! appendStringLiteralAH(cmdQry, lo_comment, AH); ! appendPQExpBuffer(cmdQry, ";\n"); ! } ! /* dump blob ownership, if necessary */ ! if (!PQgetisnull(res, i, 2)) ! { ! appendPQExpBuffer(cmdQry, ! "ALTER LARGE OBJECT %u OWNER TO %s;\n", ! blobOid, lo_owner); ! } ! /* dump blob privileges, if necessary */ ! if (!PQgetisnull(res, i, 3) && ! !dataOnly && !aclsSkip) ! { ! snprintf(lo_name, sizeof(lo_name), "%u", blobOid); ! if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT", ! lo_acl, lo_owner, "", ! AH->remoteVersion, cmdQry)) ! { ! write_msg(NULL, "could not parse ACL (%s) for " ! "large object %u", lo_acl, blobOid); ! exit_nicely(); ! } ! } ! if (cmdQry->len > 0) { ! appendPQExpBuffer(cmdQry, "\n"); ! archputs(cmdQry->data, AH); } } } while (PQntuples(res) > 0); --- 2145,2244 ---- { Oid blobOid = atooid(PQgetvalue(res, i, 0)); char *lo_comment = PQgetvalue(res, i, 1); + /* comment on the blob, if necessary */ + if (PQgetisnull(res, i, 1)) + continue; + resetPQExpBuffer(cmdQry); ! appendPQExpBuffer(cmdQry, ! "COMMENT ON LARGE OBJECT %u IS ", blobOid); ! appendStringLiteralAH(cmdQry, lo_comment, AH); ! appendPQExpBuffer(cmdQry, ";\n\n"); ! archputs(cmdQry->data, AH); ! } ! } while (PQntuples(res) > 0); ! PQclear(res); ! archputs("\n", AH); ! ! destroyPQExpBuffer(cmdQry); ! ! /* Cleanup cursor */ ! res = PQexec(g_conn, blobCloseQry); ! check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK); ! ! return 1; ! } ! ! /* ! * dumpBlobAcls ! * dump all blob privileges. ! * ! * Since we don't provide any way to be selective about dumping blobs, ! * there's no need to be selective about their privileges either. ! * We put all the privileges into one big TOC entry. ! */ ! static int ! dumpBlobAcls(Archive *AH, void *arg) ! { ! char *rolname = ((BlobsInfo *)arg)->rolname; ! PQExpBuffer cmdQry = createPQExpBuffer(); ! PQExpBuffer blobQry = createPQExpBuffer(); ! const char *blobFetchQry = "FETCH 100 IN blobacl"; ! const char *blobCloseQry = "CLOSE blobacl"; ! PGresult *res; ! int i; ! ! if (g_verbose) ! write_msg(NULL, "saving large object permissions\n"); ! ! /* Cursor to get all BLOB permissions */ ! if (AH->remoteVersion >= 80500) ! appendPQExpBuffer(blobQry, "DECLARE blobacl CURSOR FOR " ! "SELECT oid, lomacl FROM pg_largeobject_metadata " ! "WHERE pg_get_userbyid(lomowner) = '%s'", rolname); ! else ! return 1; /* no need to dump anything < 8.5 */ ! ! /* Make sure we are in proper schema */ ! selectSourceSchema("pg_catalog"); ! ! /* Open cursor */ ! res = PQexec(g_conn, blobQry->data); ! check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK); ! ! do ! { ! PQclear(res); ! ! /* Do a fetch */ ! res = PQexec(g_conn, blobFetchQry); ! check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK); ! ! /* Process the tuples, if any */ ! for (i = 0; i < PQntuples(res); i++) ! { ! char *lo_oid = PQgetvalue(res, i, 0); ! char *lo_acl = PQgetvalue(res, i, 1); ! ! if (PQgetisnull(res, i, 1)) ! continue; ! ! resetPQExpBuffer(cmdQry); ! ! if (!buildACLCommands(lo_oid, NULL, "LARGE OBJECT", ! lo_acl, rolname, "", ! AH->remoteVersion, cmdQry)) { ! write_msg(NULL, "could not parse ACL (%s) for " ! "large object %s", lo_acl, lo_oid); ! exit_nicely(); } + archprintf(AH, "%s\n", cmdQry->data); } } while (PQntuples(res) > 0); *************** *** 2173,2178 **** --- 2248,2257 ---- destroyPQExpBuffer(cmdQry); + /* Cleanup */ + res = PQexec(g_conn, blobCloseQry); + check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK); + return 1; } *************** *** 6292,6311 **** break; case DO_BLOBS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, ! dobj->name, NULL, NULL, "", false, "BLOBS", SECTION_DATA, "", "", NULL, dobj->dependencies, dobj->nDeps, ! dumpBlobs, NULL); break; case DO_BLOB_COMMENTS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, ! dobj->name, NULL, NULL, "", false, "BLOB COMMENTS", SECTION_DATA, "", "", NULL, dobj->dependencies, dobj->nDeps, ! dumpBlobComments, NULL); break; } } --- 6371,6401 ---- break; case DO_BLOBS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, ! dobj->name, NULL, NULL, ! ((BlobsInfo *) dobj)->rolname, false, "BLOBS", SECTION_DATA, "", "", NULL, dobj->dependencies, dobj->nDeps, ! dumpBlobs, dobj); break; case DO_BLOB_COMMENTS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, ! dobj->name, NULL, NULL, ! ((BlobsInfo *) dobj)->rolname, false, "BLOB COMMENTS", SECTION_DATA, "", "", NULL, dobj->dependencies, dobj->nDeps, ! dumpBlobComments, dobj); break; + case DO_BLOB_ACLS: + ArchiveEntry(fout, dobj->catId, dobj->dumpId, + dobj->name, NULL, NULL, + ((BlobsInfo *) dobj)->rolname, + false, "BLOB ACLS", SECTION_DATA, + "", "", NULL, + dobj->dependencies, dobj->nDeps, + dumpBlobAcls, dobj); + break; } }
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers