On 2016-12-10 08:48:55 +0100, Petr Jelinek wrote:

> diff --git a/src/backend/catalog/pg_publication.c 
> b/src/backend/catalog/pg_publication.c
> new file mode 100644
> index 0000000..e3560b7
> --- /dev/null
> +++ b/src/backend/catalog/pg_publication.c
> +
> +Datum pg_get_publication_tables(PG_FUNCTION_ARGS);

Don't we usually put these in a header?

> +/*
> + * Insert new publication / relation mapping.
> + */
> +ObjectAddress
> +publication_add_relation(Oid pubid, Relation targetrel,
> +                                              bool if_not_exists)
> +{
> +     Relation        rel;
> +     HeapTuple       tup;
> +     Datum           values[Natts_pg_publication_rel];
> +     bool            nulls[Natts_pg_publication_rel];
> +     Oid                     relid = RelationGetRelid(targetrel);
> +     Oid                     prrelid;
> +     Publication *pub = GetPublication(pubid);
> +     ObjectAddress   myself,
> +                                     referenced;
> +
> +     rel = heap_open(PublicationRelRelationId, RowExclusiveLock);
> +
> +     /* Check for duplicates */

Maybe mention that that check is racy, but a unique index protects
against the race?


> +     /* Insert tuple into catalog. */
> +     prrelid = simple_heap_insert(rel, tup);
> +     CatalogUpdateIndexes(rel, tup);
> +     heap_freetuple(tup);
> +
> +     ObjectAddressSet(myself, PublicationRelRelationId, prrelid);
> +
> +     /* Add dependency on the publication */
> +     ObjectAddressSet(referenced, PublicationRelationId, pubid);
> +     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
> +
> +     /* Add dependency on the relation */
> +     ObjectAddressSet(referenced, RelationRelationId, relid);
> +     recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO);
> +
> +     /* Close the table. */
> +     heap_close(rel, RowExclusiveLock);

I'm not quite sure abou the policy, but shouldn't we invoke
InvokeObjectPostCreateHook etc here?


> +/*
> + * Gets list of relation oids for a publication.
> + *
> + * This should only be used for normal publications, the FOR ALL TABLES
> + * should use GetAllTablesPublicationRelations().
> + */
> +List *
> +GetPublicationRelations(Oid pubid)
> +{
> +     List               *result;
> +     Relation                pubrelsrel;
> +     ScanKeyData             scankey;
> +     SysScanDesc             scan;
> +     HeapTuple               tup;
> +
> +     /* Find all publications associated with the relation. */
> +     pubrelsrel = heap_open(PublicationRelRelationId, AccessShareLock);
> +
> +     ScanKeyInit(&scankey,
> +                             Anum_pg_publication_rel_prpubid,
> +                             BTEqualStrategyNumber, F_OIDEQ,
> +                             ObjectIdGetDatum(pubid));
> +
> +     scan = systable_beginscan(pubrelsrel, PublicationRelMapIndexId, true,
> +                                                       NULL, 1, &scankey);
> +
> +     result = NIL;
> +     while (HeapTupleIsValid(tup = systable_getnext(scan)))
> +     {
> +             Form_pg_publication_rel         pubrel;
> +
> +             pubrel = (Form_pg_publication_rel) GETSTRUCT(tup);
> +
> +             result = lappend_oid(result, pubrel->prrelid);
> +     }
> +
> +     systable_endscan(scan);
> +     heap_close(pubrelsrel, NoLock);

In other parts of this you drop the lock, but not here?


> +     heap_close(rel, NoLock);
> +
> +     return result;
> +}

and here.


> +/*
> + * Gets list of all relation published by FOR ALL TABLES publication(s).
> + */
> +List *
> +GetAllTablesPublicationRelations(void)
> +{
> +     Relation        classRel;
> +     ScanKeyData key[1];
> +     HeapScanDesc scan;
> +     HeapTuple       tuple;
> +     List       *result = NIL;
> +
> +     classRel = heap_open(RelationRelationId, AccessShareLock);

> +     heap_endscan(scan);
> +     heap_close(classRel, AccessShareLock);
> +
> +     return result;
> +}

but here.


Btw, why are matviews not publishable?

> +/*
> + * Get Publication using name.
> + */
> +Publication *
> +GetPublicationByName(const char *pubname, bool missing_ok)
> +{
> +     Oid                     oid;
> +
> +     oid = GetSysCacheOid1(PUBLICATIONNAME, CStringGetDatum(pubname));
> +     if (!OidIsValid(oid))
> +     {
> +             if (missing_ok)
> +                     return NULL;
> +
> +             ereport(ERROR,
> +                             (errcode(ERRCODE_UNDEFINED_OBJECT),
> +                              errmsg("publication \"%s\" does not exist", 
> pubname)));
> +     }
> +
> +     return GetPublication(oid);
> +}

That's racy... Also, shouldn't we specify for how to deal with the
returned memory for Publication * returning methods?



> diff --git a/src/backend/commands/publicationcmds.c 
> b/src/backend/commands/publicationcmds.c
> new file mode 100644
> index 0000000..954b2bd
> --- /dev/null
> +++ b/src/backend/commands/publicationcmds.c
> @@ -0,0 +1,613 @@

> +/*
> + * Create new publication.
> + */
> +ObjectAddress
> +CreatePublication(CreatePublicationStmt *stmt)
> +{
> +     Relation        rel;

> +
> +     values[Anum_pg_publication_puballtables - 1] =
> +             BoolGetDatum(stmt->for_all_tables);
> +     values[Anum_pg_publication_pubinsert - 1] =
> +             BoolGetDatum(publish_insert);
> +     values[Anum_pg_publication_pubupdate - 1] =
> +             BoolGetDatum(publish_update);
> +     values[Anum_pg_publication_pubdelete - 1] =
> +             BoolGetDatum(publish_delete);

I remain convinced that a different representation would be
better. There'll be more options over time (truncate, DDL at least).


> +static void
> +AlterPublicationOptions(AlterPublicationStmt *stmt, Relation rel,
> +                                        HeapTuple tup)
> +{
> +     bool            publish_insert_given;
> +     bool            publish_update_given;
> +     bool            publish_delete_given;
> +     bool            publish_insert;
> +     bool            publish_update;
> +     bool            publish_delete;
> +     ObjectAddress           obj;
> +
> +     parse_publication_options(stmt->options,
> +                                                       
> &publish_insert_given, &publish_insert,
> +                                                       
> &publish_update_given, &publish_update,
> +                                                       
> &publish_delete_given, &publish_delete);

You could pass it a struct instead...


> +static List *
> +OpenTableList(List *tables)
> +{
> +     List       *relids = NIL;
> +     List       *rels = NIL;
> +     ListCell   *lc;
> +
> +     /*
> +      * Open, share-lock, and check all the explicitly-specified relations
> +      */
> +     foreach(lc, tables)
> +     {
> +             RangeVar   *rv = lfirst(lc);
> +             Relation        rel;
> +             bool            recurse = interpretInhOption(rv->inhOpt);
> +             Oid                     myrelid;
> +
> +             rel = heap_openrv(rv, ShareUpdateExclusiveLock);
> +             myrelid = RelationGetRelid(rel);
> +             /* filter out duplicates when user specifies "foo, foo" */
> +             if (list_member_oid(relids, myrelid))
> +             {
> +                     heap_close(rel, ShareUpdateExclusiveLock);
> +                     continue;
> +             }

This is a quadratic algorithm - that could bite us... Not sure if we
need to care.  If we want to fix it, one approach owuld be to use
RangeVarGetRelid() instead, and then do a qsort/deduplicate before
actually opening the relations.

>  
> -def_elem:    ColLabel '=' def_arg
> +def_elem:    def_key '=' def_arg
>                               {
>                                       $$ = makeDefElem($1, (Node *) $3, @1);
>                               }
> -                     | ColLabel
> +                     | def_key
>                               {
>                                       $$ = makeDefElem($1, NULL, @1);
>                               }
>               ;

> +def_key:
> +                     ColLabel                                                
> { $$ = $1; }
> +                     | ColLabel ColLabel                             { $$ = 
> psprintf("%s %s", $1, $2); }
> +             ;
> +

Not quite sure what this is about?  Doesn't that change the accepted
syntax in a bunch of places?


> @@ -2337,6 +2338,8 @@ RelationDestroyRelation(Relation relation, bool 
> remember_tupdesc)
>       bms_free(relation->rd_indexattr);
>       bms_free(relation->rd_keyattr);
>       bms_free(relation->rd_idattr);
> +     if (relation->rd_pubactions)
> +             pfree(relation->rd_pubactions);
>       if (relation->rd_options)
>               pfree(relation->rd_options);
>       if (relation->rd_indextuple)
> @@ -4992,6 +4995,67 @@ RelationGetExclusionInfo(Relation indexRelation,
>       MemoryContextSwitchTo(oldcxt);
>  }
>  
> +/*
> + * Get publication actions for the given relation.
> + */
> +struct PublicationActions *
> +GetRelationPublicationActions(Relation relation)
> +{
> +     List       *puboids;
> +     ListCell   *lc;
> +     MemoryContext           oldcxt;
> +     PublicationActions *pubactions = palloc0(sizeof(PublicationActions));
> +
> +     if (relation->rd_pubactions)
> +             return memcpy(pubactions, relation->rd_pubactions,
> +                                       sizeof(PublicationActions));
> +
> +     /* Fetch the publication membership info. */
> +     puboids = GetRelationPublications(RelationGetRelid(relation));
> +     puboids = list_concat_unique_oid(puboids, GetAllTablesPublications());
> +
> +     foreach(lc, puboids)
> +     {
> +             Oid                     pubid = lfirst_oid(lc);
> +             HeapTuple       tup;
> +             Form_pg_publication pubform;
> +
> +             tup = SearchSysCache1(PUBLICATIONOID, ObjectIdGetDatum(pubid));
> +
> +             if (!HeapTupleIsValid(tup))
> +                     elog(ERROR, "cache lookup failed for publication %u", 
> pubid);
> +
> +             pubform = (Form_pg_publication) GETSTRUCT(tup);
> +
> +             pubactions->pubinsert |= pubform->pubinsert;
> +             pubactions->pubupdate |= pubform->pubupdate;
> +             pubactions->pubdelete |= pubform->pubdelete;
> +
> +             ReleaseSysCache(tup);
> +
> +             /*
> +              * If we know everything is replicated, there is no point to 
> check
> +              * for other publications.
> +              */
> +             if (pubactions->pubinsert && pubactions->pubupdate &&
> +                     pubactions->pubdelete)
> +                     break;
> +     }
> +
> +     if (relation->rd_pubactions)
> +     {
> +             pfree(relation->rd_pubactions);
> +             relation->rd_pubactions = NULL;
> +     }
> +
> +     /* Now save copy of the actions in the relcache entry. */
> +     oldcxt = MemoryContextSwitchTo(CacheMemoryContext);
> +     relation->rd_pubactions = palloc(sizeof(PublicationActions));
> +     memcpy(relation->rd_pubactions, pubactions, sizeof(PublicationActions));
> +     MemoryContextSwitchTo(oldcxt);
> +
> +     return pubactions;
> +}


Hm. Do we actually have enough cache invalidation support to make this
cached version correct?  I haven't seen anything in that regard? Seems
to mean that all changes to an ALL TABLES publication need to do a
global relcache invalidation?

- Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to