On 2016-12-10 08:48:55 +0100, Petr Jelinek wrote: > diff --git a/src/backend/catalog/pg_publication.c > b/src/backend/catalog/pg_publication.c > new file mode 100644 > index 0000000..e3560b7 > --- /dev/null > +++ b/src/backend/catalog/pg_publication.c > + > +Datum pg_get_publication_tables(PG_FUNCTION_ARGS);
Don't we usually put these in a header? > +/* > + * Insert new publication / relation mapping. > + */ > +ObjectAddress > +publication_add_relation(Oid pubid, Relation targetrel, > + bool if_not_exists) > +{ > + Relation rel; > + HeapTuple tup; > + Datum values[Natts_pg_publication_rel]; > + bool nulls[Natts_pg_publication_rel]; > + Oid relid = RelationGetRelid(targetrel); > + Oid prrelid; > + Publication *pub = GetPublication(pubid); > + ObjectAddress myself, > + referenced; > + > + rel = heap_open(PublicationRelRelationId, RowExclusiveLock); > + > + /* Check for duplicates */ Maybe mention that that check is racy, but a unique index protects against the race? > + /* Insert tuple into catalog. */ > + prrelid = simple_heap_insert(rel, tup); > + CatalogUpdateIndexes(rel, tup); > + heap_freetuple(tup); > + > + ObjectAddressSet(myself, PublicationRelRelationId, prrelid); > + > + /* Add dependency on the publication */ > + ObjectAddressSet(referenced, PublicationRelationId, pubid); > + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); > + > + /* Add dependency on the relation */ > + ObjectAddressSet(referenced, RelationRelationId, relid); > + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); > + > + /* Close the table. */ > + heap_close(rel, RowExclusiveLock); I'm not quite sure abou the policy, but shouldn't we invoke InvokeObjectPostCreateHook etc here? > +/* > + * Gets list of relation oids for a publication. > + * > + * This should only be used for normal publications, the FOR ALL TABLES > + * should use GetAllTablesPublicationRelations(). > + */ > +List * > +GetPublicationRelations(Oid pubid) > +{ > + List *result; > + Relation pubrelsrel; > + ScanKeyData scankey; > + SysScanDesc scan; > + HeapTuple tup; > + > + /* Find all publications associated with the relation. */ > + pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock); > + > + ScanKeyInit(&scankey, > + Anum_pg_publication_rel_prpubid, > + BTEqualStrategyNumber, F_OIDEQ, > + ObjectIdGetDatum(pubid)); > + > + scan = systable_beginscan(pubrelsrel, PublicationRelMapIndexId, true, > + NULL, 1, &scankey); > + > + result = NIL; > + while (HeapTupleIsValid(tup = systable_getnext(scan))) > + { > + Form_pg_publication_rel pubrel; > + > + pubrel = (Form_pg_publication_rel) GETSTRUCT(tup); > + > + result = lappend_oid(result, pubrel->prrelid); > + } > + > + systable_endscan(scan); > + heap_close(pubrelsrel, NoLock); In other parts of this you drop the lock, but not here? > + heap_close(rel, NoLock); > + > + return result; > +} and here. > +/* > + * Gets list of all relation published by FOR ALL TABLES publication(s). > + */ > +List * > +GetAllTablesPublicationRelations(void) > +{ > + Relation classRel; > + ScanKeyData key[1]; > + HeapScanDesc scan; > + HeapTuple tuple; > + List *result = NIL; > + > + classRel = heap_open(RelationRelationId, AccessShareLock); > + heap_endscan(scan); > + heap_close(classRel, AccessShareLock); > + > + return result; > +} but here. Btw, why are matviews not publishable? > +/* > + * Get Publication using name. > + */ > +Publication * > +GetPublicationByName(const char *pubname, bool missing_ok) > +{ > + Oid oid; > + > + oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname)); > + if (!OidIsValid(oid)) > + { > + if (missing_ok) > + return NULL; > + > + ereport(ERROR, > + (errcode(ERRCODE_UNDEFINED_OBJECT), > + errmsg("publication \"%s\" does not exist", > pubname))); > + } > + > + return GetPublication(oid); > +} That's racy... Also, shouldn't we specify for how to deal with the returned memory for Publication * returning methods? > diff --git a/src/backend/commands/publicationcmds.c > b/src/backend/commands/publicationcmds.c > new file mode 100644 > index 0000000..954b2bd > --- /dev/null > +++ b/src/backend/commands/publicationcmds.c > @@ -0,0 +1,613 @@ > +/* > + * Create new publication. > + */ > +ObjectAddress > +CreatePublication(CreatePublicationStmt *stmt) > +{ > + Relation rel; > + > + values[Anum_pg_publication_puballtables - 1] = > + BoolGetDatum(stmt->for_all_tables); > + values[Anum_pg_publication_pubinsert - 1] = > + BoolGetDatum(publish_insert); > + values[Anum_pg_publication_pubupdate - 1] = > + BoolGetDatum(publish_update); > + values[Anum_pg_publication_pubdelete - 1] = > + BoolGetDatum(publish_delete); I remain convinced that a different representation would be better. There'll be more options over time (truncate, DDL at least). > +static void > +AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel, > + HeapTuple tup) > +{ > + bool publish_insert_given; > + bool publish_update_given; > + bool publish_delete_given; > + bool publish_insert; > + bool publish_update; > + bool publish_delete; > + ObjectAddress obj; > + > + parse_publication_options(stmt->options, > + > &publish_insert_given, &publish_insert, > + > &publish_update_given, &publish_update, > + > &publish_delete_given, &publish_delete); You could pass it a struct instead... > +static List * > +OpenTableList(List *tables) > +{ > + List *relids = NIL; > + List *rels = NIL; > + ListCell *lc; > + > + /* > + * Open, share-lock, and check all the explicitly-specified relations > + */ > + foreach(lc, tables) > + { > + RangeVar *rv = lfirst(lc); > + Relation rel; > + bool recurse = interpretInhOption(rv->inhOpt); > + Oid myrelid; > + > + rel = heap_openrv(rv, ShareUpdateExclusiveLock); > + myrelid = RelationGetRelid(rel); > + /* filter out duplicates when user specifies "foo, foo" */ > + if (list_member_oid(relids, myrelid)) > + { > + heap_close(rel, ShareUpdateExclusiveLock); > + continue; > + } This is a quadratic algorithm - that could bite us... Not sure if we need to care. If we want to fix it, one approach owuld be to use RangeVarGetRelid() instead, and then do a qsort/deduplicate before actually opening the relations. > > -def_elem: ColLabel '=' def_arg > +def_elem: def_key '=' def_arg > { > $$ = makeDefElem($1, (Node *) $3, @1); > } > - | ColLabel > + | def_key > { > $$ = makeDefElem($1, NULL, @1); > } > ; > +def_key: > + ColLabel > { $$ = $1; } > + | ColLabel ColLabel { $$ = > psprintf("%s %s", $1, $2); } > + ; > + Not quite sure what this is about? Doesn't that change the accepted syntax in a bunch of places? > @@ -2337,6 +2338,8 @@ RelationDestroyRelation(Relation relation, bool > remember_tupdesc) > bms_free(relation->rd_indexattr); > bms_free(relation->rd_keyattr); > bms_free(relation->rd_idattr); > + if (relation->rd_pubactions) > + pfree(relation->rd_pubactions); > if (relation->rd_options) > pfree(relation->rd_options); > if (relation->rd_indextuple) > @@ -4992,6 +4995,67 @@ RelationGetExclusionInfo(Relation indexRelation, > MemoryContextSwitchTo(oldcxt); > } > > +/* > + * Get publication actions for the given relation. > + */ > +struct PublicationActions * > +GetRelationPublicationActions(Relation relation) > +{ > + List *puboids; > + ListCell *lc; > + MemoryContext oldcxt; > + PublicationActions *pubactions = palloc0(sizeof(PublicationActions)); > + > + if (relation->rd_pubactions) > + return memcpy(pubactions, relation->rd_pubactions, > + sizeof(PublicationActions)); > + > + /* Fetch the publication membership info. */ > + puboids = GetRelationPublications(RelationGetRelid(relation)); > + puboids = list_concat_unique_oid(puboids, GetAllTablesPublications()); > + > + foreach(lc, puboids) > + { > + Oid pubid = lfirst_oid(lc); > + HeapTuple tup; > + Form_pg_publication pubform; > + > + tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid)); > + > + if (!HeapTupleIsValid(tup)) > + elog(ERROR, "cache lookup failed for publication %u", > pubid); > + > + pubform = (Form_pg_publication) GETSTRUCT(tup); > + > + pubactions->pubinsert |= pubform->pubinsert; > + pubactions->pubupdate |= pubform->pubupdate; > + pubactions->pubdelete |= pubform->pubdelete; > + > + ReleaseSysCache(tup); > + > + /* > + * If we know everything is replicated, there is no point to > check > + * for other publications. > + */ > + if (pubactions->pubinsert && pubactions->pubupdate && > + pubactions->pubdelete) > + break; > + } > + > + if (relation->rd_pubactions) > + { > + pfree(relation->rd_pubactions); > + relation->rd_pubactions = NULL; > + } > + > + /* Now save copy of the actions in the relcache entry. */ > + oldcxt = MemoryContextSwitchTo(CacheMemoryContext); > + relation->rd_pubactions = palloc(sizeof(PublicationActions)); > + memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions)); > + MemoryContextSwitchTo(oldcxt); > + > + return pubactions; > +} Hm. Do we actually have enough cache invalidation support to make this cached version correct? I haven't seen anything in that regard? Seems to mean that all changes to an ALL TABLES publication need to do a global relcache invalidation? - Andres -- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers