The attached patch is a revised version. List of updates: - cleanup: getBlobs() was renamed to getBlobOwners() - cleanup: BlobsInfo was renamed to BlobOwnerInfo - bugfix: pg_get_userbyid() in SQLs were replaced by username_subquery which constins a right subquery to obtain a username for the given id. It enables to run pg_dump under the concurrent role deletion. - bugfix: Even if we give -a (--data-only) or -O (--no-owner) options, or archive does not contain Owner information, it tried to write out "ALTER LARGE OBJECT xxx OWNER TO ..." statement. - bugfix: Even if we give -a (--data-only) or -x (--no-privileges) options, it tried to write out "BLOB ACLS" section.
The last two are the problems I noticed newly. It needs to introduce them. The BLOB section can contain multiple definitions of large objects, unlike any other object classes. It is also a reason why I had to group large objects by database user. The Owner tag of BLOB section is used to identify owner of the large objects to be restored, and also used in --use-set-session-authorization mode. However, we need to inject "ALTER LARGE OBJECT xxx OWNER TO ..." statement for each lo_create() in _LoadBlobs(), because we cannot know how many large objects are in the section before reading the archive. But the last patch didn't pay mention about -a/-O option and an archive which does not have Owner: tag. The BLOB ACLS section is categorized to SECTION_DATA, it follows the manner in BLOB COMMENTS behavior. In same reason, it has to handle the -a/-x option by itself, but the last patch didn't handle it well. BTW, here is a known issue. When we run pg_dump with -s(--schema-only), it write out descriptions of regular object classes, but does not write out the description of large objects. It seems to me the description of large objects are considered as a part of data, not properties. However, it might be inconsistent. The reason of this behavior is all the BLOB dumps are categorized to SECTION_DATA, so -s option informs pg_backup_archiver.c to skip routines related to large objects. However, it may be a time to consider this code into two steps. In the schema section stage, - It creates empty large objects with lo_create() - It restores the description of the large objects. - It restores the ownership/privileges of the large objects. In the date section stage, - It loads actual data contents into the empty large objects with lowrite(). Thanks, (2010/01/21 19:42), Takahiro Itagaki wrote: > > KaiGai Kohei<kai...@ak.jp.nec.com> wrote: > >>> I'm not sure whether we need to make groups for each owner of large objects. >>> If I remember right, the primary issue was separating routines for dump >>> BLOB ACLS from routines for BLOB COMMENTS, right? Why did you make the >>> change? >> >> When --use-set-session-authorization is specified, pg_restore tries to >> change database role of the current session just before creation of >> database objects to be restored. >> >> Ownership of the database objects are recorded in the section header, >> and it informs pg_restore who should be owner of the objects to be >> restored in this section. >> >> Then, pg_restore can generate ALTER xxx OWNER TO after creation, or >> SET SESSION AUTHORIZATION before creation in runtime. >> So, we cannot put creation of largeobjects with different ownership >> in same section. >> >> It is the reason why we have to group largeobjects by database user. > > Ah, I see. > > Then... What happen if we drop or rename roles who have large objects > during pg_dump? Does the patch still work? It uses pg_get_userbyid(). > > Regards, > --- > Takahiro Itagaki > NTT Open Source Software Center > > > -- OSS Platform Development Division, NEC KaiGai Kohei <kai...@ak.jp.nec.com>
*** a/src/bin/pg_dump/pg_backup_archiver.c --- b/src/bin/pg_dump/pg_backup_archiver.c *************** *** 517,527 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te, */ if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) { - _printTocEntry(AH, te, ropt, true, false); - if (strcmp(te->desc, "BLOBS") == 0 || ! strcmp(te->desc, "BLOB COMMENTS") == 0) { ahlog(AH, 1, "restoring %s\n", te->desc); _selectOutputSchema(AH, "pg_catalog"); --- 517,535 ---- */ if (AH->PrintTocDataPtr !=NULL && (reqs & REQ_DATA) != 0) { if (strcmp(te->desc, "BLOBS") == 0 || ! strcmp(te->desc, "BLOB COMMENTS") == 0 || ! strcmp(te->desc, "BLOB ACLS") == 0) { + /* + * We don't need to dump ACLs, if -a or -x cases. + */ + if (strcmp(te->desc, "BLOB ACLS") == 0 && + (ropt->aclsSkip || ropt->dataOnly)) + return retval; + + _printTocEntry(AH, te, ropt, true, false); + ahlog(AH, 1, "restoring %s\n", te->desc); _selectOutputSchema(AH, "pg_catalog"); *************** *** 530,535 **** restore_toc_entry(ArchiveHandle *AH, TocEntry *te, --- 538,545 ---- } else { + _printTocEntry(AH, te, ropt, true, false); + _disableTriggersIfNecessary(AH, te, ropt); /* Select owner and schema as necessary */ *** a/src/bin/pg_dump/pg_backup_custom.c --- b/src/bin/pg_dump/pg_backup_custom.c *************** *** 54,60 **** static void _StartBlobs(ArchiveHandle *AH, TocEntry *te); static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); ! static void _LoadBlobs(ArchiveHandle *AH, bool drop); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); --- 54,60 ---- static void _StartBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlob(ArchiveHandle *AH, TocEntry *te, Oid oid); static void _EndBlobs(ArchiveHandle *AH, TocEntry *te); ! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _Clone(ArchiveHandle *AH); static void _DeClone(ArchiveHandle *AH); *************** *** 498,504 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) break; case BLK_BLOBS: ! _LoadBlobs(AH, ropt->dropSchema); break; default: /* Always have a default */ --- 498,504 ---- break; case BLK_BLOBS: ! _LoadBlobs(AH, te, ropt); break; default: /* Always have a default */ *************** *** 619,625 **** _PrintData(ArchiveHandle *AH) } static void ! _LoadBlobs(ArchiveHandle *AH, bool drop) { Oid oid; --- 619,625 ---- } static void ! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) { Oid oid; *************** *** 628,636 **** _LoadBlobs(ArchiveHandle *AH, bool drop) oid = ReadInt(AH); while (oid != 0) { ! StartRestoreBlob(AH, oid, drop); _PrintData(AH); EndRestoreBlob(AH, oid); oid = ReadInt(AH); } --- 628,640 ---- oid = ReadInt(AH); while (oid != 0) { ! StartRestoreBlob(AH, oid, ropt->dropSchema); _PrintData(AH); EndRestoreBlob(AH, oid); + if (!ropt->noOwner && !ropt->dataOnly && + !ropt->use_setsessauth && strlen(te->owner) > 0) + ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n", + oid, te->owner); oid = ReadInt(AH); } *** a/src/bin/pg_dump/pg_backup_files.c --- b/src/bin/pg_dump/pg_backup_files.c *************** *** 66,72 **** typedef struct } lclTocEntry; static const char *modulename = gettext_noop("file archiver"); ! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt); static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname); /* --- 66,72 ---- } lclTocEntry; static const char *modulename = gettext_noop("file archiver"); ! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static void _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char *fname); /* *************** *** 330,336 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) return; if (strcmp(te->desc, "BLOBS") == 0) ! _LoadBlobs(AH, ropt); else _PrintFileData(AH, tctx->filename, ropt); } --- 330,336 ---- return; if (strcmp(te->desc, "BLOBS") == 0) ! _LoadBlobs(AH, te, ropt); else _PrintFileData(AH, tctx->filename, ropt); } *************** *** 365,371 **** _getBlobTocEntry(ArchiveHandle *AH, Oid *oid, char fname[K_STD_BUF_SIZE]) } static void ! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; --- 365,371 ---- } static void ! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; *************** *** 385,390 **** _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt) --- 385,394 ---- StartRestoreBlob(AH, oid, ropt->dropSchema); _PrintFileData(AH, fname, ropt); EndRestoreBlob(AH, oid); + if (!ropt->noOwner && !ropt->dataOnly && + !ropt->use_setsessauth && strlen(te->owner) > 0) + ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n", + oid, te->owner); _getBlobTocEntry(AH, &oid, fname); } *** a/src/bin/pg_dump/pg_backup_tar.c --- b/src/bin/pg_dump/pg_backup_tar.c *************** *** 100,106 **** typedef struct static const char *modulename = gettext_noop("tar archiver"); ! static void _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt); static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode); static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH); --- 100,106 ---- static const char *modulename = gettext_noop("tar archiver"); ! static void _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt); static TAR_MEMBER *tarOpen(ArchiveHandle *AH, const char *filename, char mode); static void tarClose(ArchiveHandle *AH, TAR_MEMBER *TH); *************** *** 696,708 **** _PrintTocData(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) } if (strcmp(te->desc, "BLOBS") == 0) ! _LoadBlobs(AH, ropt); else _PrintFileData(AH, tctx->filename, ropt); } static void ! _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; --- 696,708 ---- } if (strcmp(te->desc, "BLOBS") == 0) ! _LoadBlobs(AH, te, ropt); else _PrintFileData(AH, tctx->filename, ropt); } static void ! _LoadBlobs(ArchiveHandle *AH, TocEntry *te, RestoreOptions *ropt) { Oid oid; lclContext *ctx = (lclContext *) AH->formatData; *************** *** 733,738 **** _LoadBlobs(ArchiveHandle *AH, RestoreOptions *ropt) --- 733,742 ---- ahwrite(buf, 1, cnt, AH); } EndRestoreBlob(AH, oid); + if (!ropt->noOwner && !ropt->dataOnly && + !ropt->use_setsessauth && strlen(te->owner) > 0) + ahprintf(AH, "ALTER LARGE OBJECT %u OWNER TO %s;\n\n", + oid, te->owner); foundBlob = true; } tarClose(AH, th); *** a/src/bin/pg_dump/pg_dump.c --- b/src/bin/pg_dump/pg_dump.c *************** *** 190,198 **** static void selectSourceSchema(const char *schemaName); static char *getFormattedTypeName(Oid oid, OidOptions opts); static char *myFormatType(const char *typname, int32 typmod); static const char *fmtQualifiedId(const char *schema, const char *id); ! static bool hasBlobs(Archive *AH); static int dumpBlobs(Archive *AH, void *arg); static int dumpBlobComments(Archive *AH, void *arg); static void dumpDatabase(Archive *AH); static void dumpEncoding(Archive *AH); static void dumpStdStrings(Archive *AH); --- 190,199 ---- static char *getFormattedTypeName(Oid oid, OidOptions opts); static char *myFormatType(const char *typname, int32 typmod); static const char *fmtQualifiedId(const char *schema, const char *id); ! static void getBlobOwners(Archive *AH); static int dumpBlobs(Archive *AH, void *arg); static int dumpBlobComments(Archive *AH, void *arg); + static int dumpBlobAcls(Archive *AH, void *arg); static void dumpDatabase(Archive *AH); static void dumpEncoding(Archive *AH); static void dumpStdStrings(Archive *AH); *************** *** 701,725 **** main(int argc, char **argv) getTableDataFKConstraints(); } ! if (outputBlobs && hasBlobs(g_fout)) ! { ! /* Add placeholders to allow correct sorting of blobs */ ! DumpableObject *blobobj; ! DumpableObject *blobcobj; ! ! blobobj = (DumpableObject *) malloc(sizeof(DumpableObject)); ! blobobj->objType = DO_BLOBS; ! blobobj->catId = nilCatalogId; ! AssignDumpId(blobobj); ! blobobj->name = strdup("BLOBS"); ! ! blobcobj = (DumpableObject *) malloc(sizeof(DumpableObject)); ! blobcobj->objType = DO_BLOB_COMMENTS; ! blobcobj->catId = nilCatalogId; ! AssignDumpId(blobcobj); ! blobcobj->name = strdup("BLOB COMMENTS"); ! addObjectDependency(blobcobj, blobobj->dumpId); ! } /* * Collect dependency data to assist in ordering the objects. --- 702,709 ---- getTableDataFKConstraints(); } ! if (outputBlobs) ! getBlobOwners(g_fout); /* * Collect dependency data to assist in ordering the objects. *************** *** 1938,1972 **** dumpStdStrings(Archive *AH) /* ! * hasBlobs: * Test whether database contains any large objects */ ! static bool ! hasBlobs(Archive *AH) { ! bool result; ! const char *blobQry; ! PGresult *res; /* Make sure we are in proper schema */ selectSourceSchema("pg_catalog"); /* Check for BLOB OIDs */ if (AH->remoteVersion >= 80500) ! blobQry = "SELECT oid FROM pg_largeobject_metadata LIMIT 1"; else if (AH->remoteVersion >= 70100) ! blobQry = "SELECT loid FROM pg_largeobject LIMIT 1"; else ! blobQry = "SELECT oid FROM pg_class WHERE relkind = 'l' LIMIT 1"; ! res = PQexec(g_conn, blobQry); ! check_sql_result(res, g_conn, blobQry, PGRES_TUPLES_OK); ! result = PQntuples(res) > 0; PQclear(res); - - return result; } /* --- 1922,1990 ---- /* ! * getBlobOwners: * Test whether database contains any large objects + * If exist, it adds BlobOwnerInfo objects for each owners */ ! static void ! getBlobOwners(Archive *AH) { ! PQExpBuffer blobQry = createPQExpBuffer(); ! BlobOwnerInfo *blobobj; ! BlobOwnerInfo *blobcobj; ! BlobOwnerInfo *blobaobj; ! PGresult *res; ! int i; /* Make sure we are in proper schema */ selectSourceSchema("pg_catalog"); /* Check for BLOB OIDs */ if (AH->remoteVersion >= 80500) ! appendPQExpBuffer(blobQry, ! "SELECT DISTINCT (%s lomowner)" ! " FROM pg_largeobject_metadata", ! username_subquery); else if (AH->remoteVersion >= 70100) ! appendPQExpBuffer(blobQry, ! "SELECT NULL FROM pg_largeobject LIMIT 1"); else ! appendPQExpBuffer(blobQry, ! "SELECT NULL FROM pg_class WHERE relkind = 'l' LIMIT 1"); ! res = PQexec(g_conn, blobQry->data); ! check_sql_result(res, g_conn, blobQry->data, PGRES_TUPLES_OK); ! for (i = 0; i < PQntuples(res); i++) ! { ! blobobj = (BlobOwnerInfo *) malloc(sizeof(BlobOwnerInfo)); ! blobobj->dobj.objType = DO_BLOBS; ! blobobj->dobj.catId = nilCatalogId; ! AssignDumpId(&blobobj->dobj); ! blobobj->dobj.name = strdup("BLOBS"); ! blobobj->rolname = strdup(PQgetvalue(res, i, 0)); ! ! blobcobj = (BlobOwnerInfo *) malloc(sizeof(BlobOwnerInfo)); ! blobcobj->dobj.objType = DO_BLOB_COMMENTS; ! blobcobj->dobj.catId = nilCatalogId; ! AssignDumpId(&blobcobj->dobj); ! blobcobj->dobj.name = strdup("BLOB COMMENTS"); ! blobcobj->rolname = strdup(PQgetvalue(res, i, 0)); ! addObjectDependency(&blobcobj->dobj, blobobj->dobj.dumpId); ! ! if (AH->remoteVersion >= 80500 && !dataOnly && !aclsSkip) ! { ! blobaobj = (BlobOwnerInfo *) malloc(sizeof(BlobOwnerInfo)); ! blobaobj->dobj.objType = DO_BLOB_ACLS; ! blobaobj->dobj.catId = nilCatalogId; ! AssignDumpId(&blobaobj->dobj); ! blobaobj->dobj.name = strdup("BLOB ACLS"); ! blobaobj->rolname = strdup(PQgetvalue(res, i, 0)); ! addObjectDependency(&blobaobj->dobj, blobobj->dobj.dumpId); ! } ! } PQclear(res); } /* *************** *** 1976,1983 **** hasBlobs(Archive *AH) static int dumpBlobs(Archive *AH, void *arg) { ! const char *blobQry; ! const char *blobFetchQry; PGresult *res; char buf[LOBBUFSIZE]; int i; --- 1994,2003 ---- static int dumpBlobs(Archive *AH, void *arg) { ! char *rolname = ((BlobOwnerInfo *)arg)->rolname; ! PQExpBuffer blobQry = createPQExpBuffer(); ! const char *blobFetchQry = "FETCH 1000 IN bloboid"; ! const char *blobCloseQry = "CLOSE bloboid"; PGresult *res; char buf[LOBBUFSIZE]; int i; *************** *** 1991,2007 **** dumpBlobs(Archive *AH, void *arg) /* Cursor to get all BLOB OIDs */ if (AH->remoteVersion >= 80500) ! blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_largeobject_metadata"; else if (AH->remoteVersion >= 70100) ! blobQry = "DECLARE bloboid CURSOR FOR SELECT DISTINCT loid FROM pg_largeobject"; else ! blobQry = "DECLARE bloboid CURSOR FOR SELECT oid FROM pg_class WHERE relkind = 'l'"; ! res = PQexec(g_conn, blobQry); ! check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK); ! ! /* Command to fetch from cursor */ ! blobFetchQry = "FETCH 1000 IN bloboid"; do { --- 2011,2029 ---- /* Cursor to get all BLOB OIDs */ if (AH->remoteVersion >= 80500) ! appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR " ! "SELECT oid, lomacl FROM pg_largeobject_metadata " ! "WHERE '%s' in (%s lomowner)", ! rolname, username_subquery); else if (AH->remoteVersion >= 70100) ! appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR " ! "SELECT DISTINCT loid, NULL FROM pg_largeobject"); else ! appendPQExpBuffer(blobQry, "DECLARE bloboid CURSOR FOR " ! "SELECT oid, NULL FROM pg_class WHERE relkind = 'l'"); ! res = PQexec(g_conn, blobQry->data); ! check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK); do { *************** *** 2051,2064 **** dumpBlobs(Archive *AH, void *arg) PQclear(res); return 1; } /* * dumpBlobComments ! * dump all blob properties. ! * It has "BLOB COMMENTS" tag due to the historical reason, but note ! * that it is the routine to dump all the properties of blobs. * * Since we don't provide any way to be selective about dumping blobs, * there's no need to be selective about their comments either. We put --- 2073,2088 ---- PQclear(res); + /* Cleanup cursor */ + res = PQexec(g_conn, blobCloseQry); + check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK); + return 1; } /* * dumpBlobComments ! * dump all blob comments. * * Since we don't provide any way to be selective about dumping blobs, * there's no need to be selective about their comments either. We put *************** *** 2067,2075 **** dumpBlobs(Archive *AH, void *arg) static int dumpBlobComments(Archive *AH, void *arg) { ! const char *blobQry; ! const char *blobFetchQry; PQExpBuffer cmdQry = createPQExpBuffer(); PGresult *res; int i; --- 2091,2101 ---- static int dumpBlobComments(Archive *AH, void *arg) { ! char *rolname = ((BlobOwnerInfo *)arg)->rolname; PQExpBuffer cmdQry = createPQExpBuffer(); + PQExpBuffer blobQry = createPQExpBuffer(); + const char *blobFetchQry = "FETCH 100 IN blobcmt"; + const char *blobCloseQry = "CLOSE blobcmt"; PGresult *res; int i; *************** *** 2081,2118 **** dumpBlobComments(Archive *AH, void *arg) /* Cursor to get all BLOB comments */ if (AH->remoteVersion >= 80500) ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, " ! "obj_description(oid, 'pg_largeobject'), " ! "pg_get_userbyid(lomowner), lomacl " ! "FROM pg_largeobject_metadata"; else if (AH->remoteVersion >= 70300) ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid, 'pg_largeobject'), NULL, NULL " ! "FROM (SELECT DISTINCT loid FROM " ! "pg_description d JOIN pg_largeobject l ON (objoid = loid) " ! "WHERE classoid = 'pg_largeobject'::regclass) ss"; else if (AH->remoteVersion >= 70200) ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid, 'pg_largeobject'), NULL, NULL " ! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"; else if (AH->remoteVersion >= 70100) ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid), NULL, NULL " ! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"; else ! blobQry = "DECLARE blobcmt CURSOR FOR SELECT oid, " ! " ( " ! " SELECT description " ! " FROM pg_description pd " ! " WHERE pd.objoid=pc.oid " ! " ), NULL, NULL " ! "FROM pg_class pc WHERE relkind = 'l'"; ! ! res = PQexec(g_conn, blobQry); ! check_sql_result(res, g_conn, blobQry, PGRES_COMMAND_OK); ! ! /* Command to fetch from cursor */ ! blobFetchQry = "FETCH 100 IN blobcmt"; do { --- 2107,2147 ---- /* Cursor to get all BLOB comments */ if (AH->remoteVersion >= 80500) ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT oid, " ! "obj_description(oid, 'pg_largeobject') " ! "FROM pg_largeobject_metadata " ! "WHERE '%s' in (%s lomowner)", ! rolname, username_subquery); else if (AH->remoteVersion >= 70300) ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid, 'pg_largeobject') " ! "FROM (SELECT DISTINCT loid FROM " ! "pg_description d JOIN pg_largeobject l ON (objoid = loid) " ! "WHERE classoid = 'pg_largeobject'::regclass) ss"); else if (AH->remoteVersion >= 70200) ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid, 'pg_largeobject') " ! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"); else if (AH->remoteVersion >= 70100) ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT loid, " ! "obj_description(loid) " ! "FROM (SELECT DISTINCT loid FROM pg_largeobject) ss"); else ! appendPQExpBuffer(blobQry, ! "DECLARE blobcmt CURSOR FOR SELECT oid, " ! " ( " ! " SELECT description " ! " FROM pg_description pd " ! " WHERE pd.objoid=pc.oid " ! " ) " ! "FROM pg_class pc WHERE relkind = 'l'"); ! ! res = PQexec(g_conn, blobQry->data); ! check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK); do { *************** *** 2127,2175 **** dumpBlobComments(Archive *AH, void *arg) { Oid blobOid = atooid(PQgetvalue(res, i, 0)); char *lo_comment = PQgetvalue(res, i, 1); ! char *lo_owner = PQgetvalue(res, i, 2); ! char *lo_acl = PQgetvalue(res, i, 3); ! char lo_name[32]; resetPQExpBuffer(cmdQry); ! /* comment on the blob */ ! if (!PQgetisnull(res, i, 1)) ! { ! appendPQExpBuffer(cmdQry, ! "COMMENT ON LARGE OBJECT %u IS ", blobOid); ! appendStringLiteralAH(cmdQry, lo_comment, AH); ! appendPQExpBuffer(cmdQry, ";\n"); ! } ! /* dump blob ownership, if necessary */ ! if (!PQgetisnull(res, i, 2)) ! { ! appendPQExpBuffer(cmdQry, ! "ALTER LARGE OBJECT %u OWNER TO %s;\n", ! blobOid, lo_owner); ! } ! /* dump blob privileges, if necessary */ ! if (!PQgetisnull(res, i, 3) && ! !dataOnly && !aclsSkip) ! { ! snprintf(lo_name, sizeof(lo_name), "%u", blobOid); ! if (!buildACLCommands(lo_name, NULL, "LARGE OBJECT", ! lo_acl, lo_owner, "", ! AH->remoteVersion, cmdQry)) ! { ! write_msg(NULL, "could not parse ACL (%s) for " ! "large object %u", lo_acl, blobOid); ! exit_nicely(); ! } ! } ! if (cmdQry->len > 0) { ! appendPQExpBuffer(cmdQry, "\n"); ! archputs(cmdQry->data, AH); } } } while (PQntuples(res) > 0); --- 2156,2256 ---- { Oid blobOid = atooid(PQgetvalue(res, i, 0)); char *lo_comment = PQgetvalue(res, i, 1); ! ! /* comment on the blob, if necessary */ ! if (PQgetisnull(res, i, 1)) ! continue; resetPQExpBuffer(cmdQry); ! appendPQExpBuffer(cmdQry, ! "COMMENT ON LARGE OBJECT %u IS ", blobOid); ! appendStringLiteralAH(cmdQry, lo_comment, AH); ! appendPQExpBuffer(cmdQry, ";\n\n"); ! archputs(cmdQry->data, AH); ! } ! } while (PQntuples(res) > 0); ! PQclear(res); ! ! archputs("\n", AH); ! ! destroyPQExpBuffer(cmdQry); ! ! /* Cleanup cursor */ ! res = PQexec(g_conn, blobCloseQry); ! check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK); ! ! return 1; ! } ! ! /* ! * dumpBlobAcls ! * dump all blob privileges. ! * ! * Since we don't provide any way to be selective about dumping blobs, ! * there's no need to be selective about their privileges either. ! * We put all the privileges into one big TOC entry. ! */ ! static int ! dumpBlobAcls(Archive *AH, void *arg) ! { ! char *rolname = ((BlobOwnerInfo *)arg)->rolname; ! PQExpBuffer cmdQry = createPQExpBuffer(); ! PQExpBuffer blobQry = createPQExpBuffer(); ! const char *blobFetchQry = "FETCH 100 IN blobacl"; ! const char *blobCloseQry = "CLOSE blobacl"; ! PGresult *res; ! int i; ! ! if (g_verbose) ! write_msg(NULL, "saving large object permissions\n"); ! ! /* Cursor to get all BLOB permissions */ ! if (AH->remoteVersion >= 80500) ! appendPQExpBuffer(blobQry, "DECLARE blobacl CURSOR FOR " ! "SELECT oid, lomacl FROM pg_largeobject_metadata " ! "WHERE '%s' in (%s lomowner)", ! rolname, username_subquery); ! else ! return 1; /* no need to dump anything < 8.5 */ ! ! /* Make sure we are in proper schema */ ! selectSourceSchema("pg_catalog"); ! ! /* Open cursor */ ! res = PQexec(g_conn, blobQry->data); ! check_sql_result(res, g_conn, blobQry->data, PGRES_COMMAND_OK); ! ! do ! { ! PQclear(res); ! ! /* Do a fetch */ ! res = PQexec(g_conn, blobFetchQry); ! check_sql_result(res, g_conn, blobFetchQry, PGRES_TUPLES_OK); ! ! /* Process the tuples, if any */ ! for (i = 0; i < PQntuples(res); i++) ! { ! char *lo_oid = PQgetvalue(res, i, 0); ! char *lo_acl = PQgetvalue(res, i, 1); ! if (PQgetisnull(res, i, 1)) ! continue; ! ! resetPQExpBuffer(cmdQry); ! ! if (!buildACLCommands(lo_oid, NULL, "LARGE OBJECT", ! lo_acl, rolname, "", ! AH->remoteVersion, cmdQry)) { ! write_msg(NULL, "could not parse ACL (%s) for " ! "large object %s", lo_acl, lo_oid); ! exit_nicely(); } + archprintf(AH, "%s\n", cmdQry->data); } } while (PQntuples(res) > 0); *************** *** 2179,2184 **** dumpBlobComments(Archive *AH, void *arg) --- 2260,2269 ---- destroyPQExpBuffer(cmdQry); + /* Cleanup */ + res = PQexec(g_conn, blobCloseQry); + check_sql_result(res, g_conn, blobCloseQry, PGRES_COMMAND_OK); + return 1; } *************** *** 6480,6498 **** dumpDumpableObject(Archive *fout, DumpableObject *dobj) break; case DO_BLOBS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, ! dobj->name, NULL, NULL, "", false, "BLOBS", SECTION_DATA, "", "", NULL, dobj->dependencies, dobj->nDeps, ! dumpBlobs, NULL); break; case DO_BLOB_COMMENTS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, ! dobj->name, NULL, NULL, "", false, "BLOB COMMENTS", SECTION_DATA, "", "", NULL, dobj->dependencies, dobj->nDeps, ! dumpBlobComments, NULL); break; } } --- 6565,6594 ---- break; case DO_BLOBS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, ! dobj->name, NULL, NULL, ! ((BlobOwnerInfo *) dobj)->rolname, false, "BLOBS", SECTION_DATA, "", "", NULL, dobj->dependencies, dobj->nDeps, ! dumpBlobs, dobj); break; case DO_BLOB_COMMENTS: ArchiveEntry(fout, dobj->catId, dobj->dumpId, ! dobj->name, NULL, NULL, ! ((BlobOwnerInfo *) dobj)->rolname, false, "BLOB COMMENTS", SECTION_DATA, "", "", NULL, dobj->dependencies, dobj->nDeps, ! dumpBlobComments, dobj); ! break; ! case DO_BLOB_ACLS: ! ArchiveEntry(fout, dobj->catId, dobj->dumpId, ! dobj->name, NULL, NULL, ! ((BlobOwnerInfo *) dobj)->rolname, ! false, "BLOB ACLS", SECTION_DATA, ! "", "", NULL, ! dobj->dependencies, dobj->nDeps, ! dumpBlobAcls, dobj); break; } } *** a/src/bin/pg_dump/pg_dump.h --- b/src/bin/pg_dump/pg_dump.h *************** *** 116,122 **** typedef enum DO_FOREIGN_SERVER, DO_DEFAULT_ACL, DO_BLOBS, ! DO_BLOB_COMMENTS } DumpableObjectType; typedef struct _dumpableObject --- 116,123 ---- DO_FOREIGN_SERVER, DO_DEFAULT_ACL, DO_BLOBS, ! DO_BLOB_COMMENTS, ! DO_BLOB_ACLS, } DumpableObjectType; typedef struct _dumpableObject *************** *** 442,447 **** typedef struct _defaultACLInfo --- 443,454 ---- char *defaclacl; } DefaultACLInfo; + typedef struct _blobOwnerInfo + { + DumpableObject dobj; + char *rolname; + } BlobOwnerInfo; + /* global decls */ extern bool force_quotes; /* double-quotes for identifiers flag */ extern bool g_verbose; /* verbose flag */ *** a/src/bin/pg_dump/pg_dump_sort.c --- b/src/bin/pg_dump/pg_dump_sort.c *************** *** 93,99 **** static const int newObjectTypePriority[] = 15, /* DO_FOREIGN_SERVER */ 27, /* DO_DEFAULT_ACL */ 20, /* DO_BLOBS */ ! 21 /* DO_BLOB_COMMENTS */ }; --- 93,100 ---- 15, /* DO_FOREIGN_SERVER */ 27, /* DO_DEFAULT_ACL */ 20, /* DO_BLOBS */ ! 21, /* DO_BLOB_COMMENTS */ ! 28, /* DO_BLOB_ACLS */ }; *************** *** 1156,1161 **** describeDumpableObject(DumpableObject *obj, char *buf, int bufsize) --- 1157,1167 ---- "BLOB COMMENTS (ID %d)", obj->dumpId); return; + case DO_BLOB_ACLS: + snprintf(buf, bufsize, + "BLOB ACLS (ID %d)", + obj->dumpId); + return; } /* shouldn't get here */ snprintf(buf, bufsize,
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers