Hi,

First read through the current version. Hence no real architectural
comments.

On 2016-09-09 00:59:26 +0200, Petr Jelinek wrote:

> diff --git a/src/backend/commands/publicationcmds.c 
> b/src/backend/commands/publicationcmds.c
> new file mode 100644
> index 0000000..e0c719d
> --- /dev/null
> +++ b/src/backend/commands/publicationcmds.c
> @@ -0,0 +1,761 @@
> +/*-------------------------------------------------------------------------
> + *
> + * publicationcmds.c
> + *           publication manipulation
> + *
> + * Copyright (c) 2015, PostgreSQL Global Development Group
> + *
> + * IDENTIFICATION
> + *           publicationcmds.c
>

Not that I'm a fan of this line in the first place, but usually it does
include the path.


> +static void
> +check_replication_permissions(void)
> +{
> +     if (!superuser() && !has_rolreplication(GetUserId()))
> +             ereport(ERROR,
> +                             (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
> +                              (errmsg("must be superuser or replication role 
> to manipulate publications"))));
> +}

Do we want to require owner privileges for replication roles? I'd say
no, but want to raise the question.


> +ObjectAddress
> +CreatePublication(CreatePublicationStmt *stmt)
> +{
> +     Relation        rel;
> +     ObjectAddress myself;
> +     Oid                     puboid;
> +     bool            nulls[Natts_pg_publication];
> +     Datum           values[Natts_pg_publication];
> +     HeapTuple       tup;
> +     bool            replicate_insert_given;
> +     bool            replicate_update_given;
> +     bool            replicate_delete_given;
> +     bool            replicate_insert;
> +     bool            replicate_update;
> +     bool            replicate_delete;
> +
> +     check_replication_permissions();
> +
> +     rel = heap_open(PublicationRelationId, RowExclusiveLock);
> +
> +     /* Check if name is used */
> +     puboid = GetSysCacheOid1(PUBLICATIONNAME, 
> CStringGetDatum(stmt->pubname));
> +     if (OidIsValid(puboid))
> +     {
> +             ereport(ERROR,
> +                             (errcode(ERRCODE_DUPLICATE_OBJECT),
> +                              errmsg("publication \"%s\" already exists",
> +                                             stmt->pubname)));
> +     }
> +
> +     /* Form a tuple. */
> +     memset(values, 0, sizeof(values));
> +     memset(nulls, false, sizeof(nulls));
> +
> +     values[Anum_pg_publication_pubname - 1] =
> +             DirectFunctionCall1(namein, CStringGetDatum(stmt->pubname));
> +
> +     parse_publication_options(stmt->options,
> +                                                       
> &replicate_insert_given, &replicate_insert,
> +                                                       
> &replicate_update_given, &replicate_update,
> +                                                       
> &replicate_delete_given, &replicate_delete);
> +
> +     values[Anum_pg_publication_puballtables - 1] =
> +             BoolGetDatum(stmt->for_all_tables);
> +     values[Anum_pg_publication_pubreplins - 1] =
> +             BoolGetDatum(replicate_insert);
> +     values[Anum_pg_publication_pubreplupd - 1] =
> +             BoolGetDatum(replicate_update);
> +     values[Anum_pg_publication_pubrepldel - 1] =
> +             BoolGetDatum(replicate_delete);
> +
> +     tup = heap_form_tuple(RelationGetDescr(rel), values, nulls);
> +
> +     /* Insert tuple into catalog. */
> +     puboid = simple_heap_insert(rel, tup);
> +     CatalogUpdateIndexes(rel, tup);
> +     heap_freetuple(tup);
> +
> +     ObjectAddressSet(myself, PublicationRelationId, puboid);
> +
> +     /* Make the changes visible. */
> +     CommandCounterIncrement();
> +
> +     if (stmt->tables)
> +     {
> +             List       *rels;
> +
> +             Assert(list_length(stmt->tables) > 0);
> +
> +             rels = GatherTableList(stmt->tables);
> +             PublicationAddTables(puboid, rels, true, NULL);
> +             CloseTables(rels);
> +     }
> +     else if (stmt->for_all_tables || stmt->schema)
> +     {
> +             List       *rels;
> +
> +             rels = GatherTables(stmt->schema);
> +             PublicationAddTables(puboid, rels, true, NULL);
> +             CloseTables(rels);
> +     }

Isn't this (and ALTER) racy? What happens if tables are concurrently
created? This session wouldn't necessarily see the tables, and other
sessions won't see for_all_tables/schema.   Evaluating
for_all_tables/all_in_schema when the publication is used, would solve
that problem.

> +/*
> + * Gather all tables optinally filtered by schema name.
> + * The gathered tables are locked in access share lock mode.
> + */
> +static List *
> +GatherTables(char *nspname)
> +{
> +     Oid                     nspid = InvalidOid;
> +     List       *rels = NIL;
> +     Relation        rel;
> +     SysScanDesc scan;
> +     ScanKeyData key[1];
> +     HeapTuple       tup;
> +
> +     /* Resolve and validate the schema if specified */
> +     if (nspname)
> +     {
> +             nspid = LookupExplicitNamespace(nspname, false);
> +             if (IsSystemNamespace(nspid) || IsToastNamespace(nspid))
> +                     ereport(ERROR,
> +                                     
> (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +                                      errmsg("only tables in user schemas 
> can be added to publication"),
> +                                      errdetail("%s is a system schema", 
> strVal(nspname))));
> +     }

Why are we restricting pg_catalog here? There's a bunch of extensions
creating objects therein, and we allow that. Seems better to just rely
on the IsSystemClass check for that below.

> +/*
> + * Gather Relations based o provided by RangeVar list.
> + * The gathered tables are locked in access share lock mode.
> + */

Why access share? Shouldn't we make this ShareUpdateExclusive or
similar, to prevent schema changes?


> +static List *
> +GatherTableList(List *tables)
> +{
> +     List       *relids = NIL;
> +     List       *rels = NIL;
> +     ListCell   *lc;
> +
> +     /*
> +      * Open, share-lock, and check all the explicitly-specified relations
> +      */
> +     foreach(lc, tables)
> +     {
> +             RangeVar   *rv = lfirst(lc);
> +             Relation        rel;
> +             bool            recurse = interpretInhOption(rv->inhOpt);
> +             Oid                     myrelid;
> +
> +             rel = heap_openrv(rv, AccessShareLock);
> +             myrelid = RelationGetRelid(rel);
> +             /* don't throw error for "foo, foo" */
> +             if (list_member_oid(relids, myrelid))
> +             {
> +                     heap_close(rel, AccessShareLock);
> +                     continue;
> +             }
> +             rels = lappend(rels, rel);
> +             relids = lappend_oid(relids, myrelid);
> +
> +             if (recurse)
> +             {
> +                     ListCell   *child;
> +                     List       *children;
> +
> +                     children = find_all_inheritors(myrelid, AccessShareLock,
> +                                                                             
>    NULL);
> +
> +                     foreach(child, children)
> +                     {
> +                             Oid                     childrelid = 
> lfirst_oid(child);
> +
> +                             if (list_member_oid(relids, childrelid))
> +                                     continue;
> +
> +                             /* find_all_inheritors already got lock */
> +                             rel = heap_open(childrelid, NoLock);
> +                             rels = lappend(rels, rel);
> +                             relids = lappend_oid(relids, childrelid);
> +                     }
> +             }
> +     }

Hm, can't this yield duplicates, when both an inherited and a top level
relation are specified?


> @@ -713,6 +714,25 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid 
> ownerId,
>       ObjectAddressSet(address, RelationRelationId, relationId);
>  
>       /*
> +      * If the newly created relation is a table and there are publications
> +      * which were created as FOR ALL TABLES, we want to add the relation
> +      * membership to those publications.
> +      */
> +
> +     if (relkind == RELKIND_RELATION)
> +     {
> +             List       *pubids = GetAllTablesPublications();
> +             ListCell   *lc;
> +
> +             foreach(lc, pubids)
> +             {
> +                     Oid     pubid = lfirst_oid(lc);
> +
> +                     publication_add_relation(pubid, rel, false);
> +             }
> +     }
> +

Hm, this has the potential to noticeably slow down table creation.

> +publication_opt_item:
> +                     IDENT
> +                             {
> +                                     /*
> +                                      * We handle identifiers that aren't 
> parser keywords with
> +                                      * the following special-case codes, to 
> avoid bloating the
> +                                      * size of the main parser.
> +                                      */
> +                                     if (strcmp($1, "replicate_insert") == 0)
> +                                             $$ = 
> makeDefElem("replicate_insert",
> +                                                                             
>  (Node *)makeInteger(TRUE), @1);
> +                                     else if (strcmp($1, 
> "noreplicate_insert") == 0)
> +                                             $$ = 
> makeDefElem("replicate_insert",
> +                                                                             
>  (Node *)makeInteger(FALSE), @1);
> +                                     else if (strcmp($1, "replicate_update") 
> == 0)
> +                                             $$ = 
> makeDefElem("replicate_update",
> +                                                                             
>  (Node *)makeInteger(TRUE), @1);
> +                                     else if (strcmp($1, 
> "noreplicate_update") == 0)
> +                                             $$ = 
> makeDefElem("replicate_update",
> +                                                                             
>  (Node *)makeInteger(FALSE), @1);
> +                                     else if (strcmp($1, "replicate_delete") 
> == 0)
> +                                             $$ = 
> makeDefElem("replicate_delete",
> +                                                                             
>  (Node *)makeInteger(TRUE), @1);
> +                                     else if (strcmp($1, 
> "noreplicate_delete") == 0)
> +                                             $$ = 
> makeDefElem("replicate_delete",
> +                                                                             
>  (Node *)makeInteger(FALSE), @1);
> +                                     else
> +                                             ereport(ERROR,
> +                                                             
> (errcode(ERRCODE_SYNTAX_ERROR),
> +                                                              
> errmsg("unrecognized publication option \"%s\"", $1),
> +                                                                      
> parser_errposition(@1)));
> +                             }
> +             ;

I'm kind of inclined to do this checking at execution (or transform)
time instead.  That allows extension to add options, and handle them in
utility hooks.

> +
> +/* ----------------
> + *           pg_publication_rel definition.  cpp turns this into
> + *           typedef struct FormData_pg_publication_rel
> + *
> + * ----------------
> + */
> +#define PublicationRelRelationId                             6106
> +
> +CATALOG(pg_publication_rel,6106)
> +{
> +     Oid             pubid;                          /* Oid of the 
> publication */
> +     Oid             relid;                          /* Oid of the relation 
> */
> +} FormData_pg_publication_rel;

Hm. Do we really want this to have an oid? Won't that significantly,
especially if multiple publications are present, increase our oid
consumption?  It seems entirely sufficient to identify rows in here
using (pubid, relid).


> +ObjectAddress
> +CreateSubscription(CreateSubscriptionStmt *stmt)
> +{
> +     Relation        rel;
> +     ObjectAddress myself;
> +     Oid                     subid;
> +     bool            nulls[Natts_pg_subscription];
> +     Datum           values[Natts_pg_subscription];
> +     HeapTuple       tup;
> +     bool            enabled_given;
> +     bool            enabled;
> +     char       *conninfo;
> +     List       *publications;
> +
> +     check_subscription_permissions();
> +
> +     rel = heap_open(SubscriptionRelationId, RowExclusiveLock);
> +
> +     /* Check if name is used */
> +     subid = GetSysCacheOid2(SUBSCRIPTIONNAME, MyDatabaseId,
> +                                                     
> CStringGetDatum(stmt->subname));
> +     if (OidIsValid(subid))
> +     {
> +             ereport(ERROR,
> +                             (errcode(ERRCODE_DUPLICATE_OBJECT),
> +                              errmsg("subscription \"%s\" already exists",
> +                                             stmt->subname)));
> +     }
> +
> +     /* Parse and check options. */
> +     parse_subscription_options(stmt->options, &enabled_given, &enabled,
> +                                                        &conninfo, 
> &publications);
> +
> +     /* TODO: improve error messages here. */
> +     if (conninfo == NULL)
> +             ereport(ERROR,
> +                             (errcode(ERRCODE_SYNTAX_ERROR),
> +                              errmsg("connection not specified")));

Probably also makes sense to parse the conninfo here to verify it looks
saen.  Although that's fairly annoying to do, because the relevant code
is libpq :(


> diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
> index 65230e2..f3d54c8 100644
> --- a/src/backend/nodes/copyfuncs.c
> +++ b/src/backend/nodes/copyfuncs.c

I think you might be missing outfuncs support.

> +
> +CATALOG(pg_subscription,6100) BKI_SHARED_RELATION BKI_ROWTYPE_OID(6101) 
> BKI_SCHEMA_MACRO
> +{
> +     Oid                     subdbid;                        /* Database the 
> subscription is in. */
> +     NameData        subname;                /* Name of the subscription */
> +     bool            subenabled;             /* True if the subsription is 
> enabled (running) */

Not sure what "running" means here.

> +#ifdef CATALOG_VARLEN                        /* variable-length fields start 
> here */
> +     text            subconninfo;    /* Connection string to the provider */
> +     NameData        subslotname;    /* Slot name on provider */
> +
> +     name            subpublications[1];     /* List of publications 
> subscribed to */
> +#endif
> +} FormData_pg_subscription;

> +    <varlistentry>
> +     <term>
> +      publication_names
> +     </term>
> +     <listitem>
> +      <para>
> +       Comma separated list of publication names for which to subscribe
> +       (receive changes). See
> +       <xref linkend="logical-replication-publication"> for more info.
> +      </para>
> +     </listitem>
> +    </varlistentry>
> +   </variablelist>

Do we need to specify an escaping scheme here?

> +  <para>
> +   Every DML message contains arbitraty relation id, which can be mapped to

Typo: "arbitraty"


> +<listitem>
> +<para>
> +                Commit timestamp of the transaction.
> +</para>
> +</listitem>
> +</varlistentry>

Perhaps mention it's relative to postgres epoch?



> +<variablelist>
> +<varlistentry>
> +<term>
> +        Byte1('O')
> +</term>
> +<listitem>
> +<para>
> +                Identifies the message as an origin message.
> +</para>
> +</listitem>
> +</varlistentry>
> +<varlistentry>
> +<term>
> +        Int64
> +</term>
> +<listitem>
> +<para>
> +                The LSN of the commit on the origin server.
> +</para>
> +</listitem>
> +</varlistentry>
> +<varlistentry>
> +<term>
> +        Int8
> +</term>
> +<listitem>
> +<para>
> +                Length of the origin name (including the NULL-termination
> +                character).
> +</para>
> +</listitem>
> +</varlistentry>

Should this explain that there could be mulitple origin messages (when
replay switched origins during an xact)?

> +<para>
> +                Relation name.
> +</para>
> +</listitem>
> +</varlistentry>
> +</variablelist>
> +
> +</para>
> +
> +<para>
> +This message is always followed by Attributes message.
> +</para>

What's the point of having this separate from the relation message?

> +<varlistentry>
> +<term>
> +        Byte1('C')
> +</term>
> +<listitem>
> +<para>
> +                Start of column block.
> +</para>
> +</listitem>

"block"?

> +</varlistentry><varlistentry>
> +<term>
> +        Int8
> +</term>
> +<listitem>
> +<para>
> +                Flags for the column. Currently can be either 0 for no flags
> +                or one which marks the column as part of the key.
> +</para>
> +</listitem>
> +</varlistentry>
> +<varlistentry>
> +<term>
> +        Int8
> +</term>
> +<listitem>
> +<para>
> +                Length of column name (including the NULL-termination
> +                character).
> +</para>
> +</listitem>
> +</varlistentry>
> +<varlistentry>
> +<term>
> +        String
> +</term>
> +<listitem>
> +<para>
> +                Name of the column.
> +</para>
> +</listitem>
> +</varlistentry>

Huh, no type information?



> +<varlistentry>
> +<term>
> +        Byte1('O')
> +</term>
> +<listitem>
> +<para>
> +                Identifies the following TupleData message as the old tuple
> +                (deleted tuple).
> +</para>
> +</listitem>
> +</varlistentry>

Should we discern between old key and old tuple?


> +#define IS_REPLICA_IDENTITY  1

Defining this in the c file doesn't seem particularly useful?



> +/*
> + * Read transaction BEGIN from the stream.
> + */
> +void
> +logicalrep_read_begin(StringInfo in, XLogRecPtr *remote_lsn,
> +                                       TimestampTz *committime, 
> TransactionId *remote_xid)
> +{
> +     /* read fields */
> +     *remote_lsn = pq_getmsgint64(in);
> +     Assert(*remote_lsn != InvalidXLogRecPtr);
> +     *committime = pq_getmsgint64(in);
> +     *remote_xid = pq_getmsgint(in, 4);
> +}

In network exposed stuff it seems better not to use assert, and error
out instead.


> +/*
> + * Write UPDATE to the output stream.
> + */
> +void
> +logicalrep_write_update(StringInfo out, Relation rel, HeapTuple oldtuple,
> +                                        HeapTuple newtuple)
> +{
> +     pq_sendbyte(out, 'U');          /* action UPDATE */
> +
> +     /* use Oid as relation identifier */
> +     pq_sendint(out, RelationGetRelid(rel), 4);

Wonder if there's a way that could screw us. What happens if there's an
oid wraparound, and a relation is dropped? Then a new relation could end
up with same id. Maybe answered somewhere further down.


> +/*
> + * Write a tuple to the outputstream, in the most efficient format possible.
> + */
> +static void
> +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple)
> +{

> +     /* Write the values */
> +     for (i = 0; i < desc->natts; i++)
> +     {
> +             outputstr =     OidOutputFunctionCall(typclass->typoutput, 
> values[i]);

Odd spacing.



> +/*
> + * Initialize this plugin
> + */
> +static void
> +pgoutput_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
> +                               bool is_init)
> +{
> +     PGOutputData   *data = palloc0(sizeof(PGOutputData));
> +     int                             client_encoding;
> +
> +     /* Create our memory context for private allocations. */
> +     data->context = AllocSetContextCreate(ctx->context,
> +                                                                             
>   "logical replication output context",
> +                                                                             
>   ALLOCSET_DEFAULT_MINSIZE,
> +                                                                             
>   ALLOCSET_DEFAULT_INITSIZE,
> +                                                                             
>   ALLOCSET_DEFAULT_MAXSIZE);
> +
> +     ctx->output_plugin_private = data;
> +
> +     /*
> +      * This is replication start and not slot initialization.
> +      *
> +      * Parse and validate options passed by the client.
> +      */
> +     if (!is_init)
> +     {
> +             /* We can only do binary */
> +             if (opt->output_type != OUTPUT_PLUGIN_BINARY_OUTPUT)
> +                     ereport(ERROR,
> +                                     
> (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
> +                                      errmsg("only binary mode is supported 
> for logical replication protocol")));

Shouldn't you just set
                                opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
or is the goal just to output a better message?

> +
> +/*
> + * COMMIT callback
> + */
> +static void
> +pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> +                                      XLogRecPtr commit_lsn)
> +{
> +     OutputPluginPrepareWrite(ctx, true);
> +     logicalrep_write_commit(ctx->out, txn, commit_lsn);
> +     OutputPluginWrite(ctx, true);
> +}

Hm, so we don't reset the context for these...

> +/*
> + * Sends the decoded DML over wire.
> + */
> +static void
> +pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
> +                             Relation relation, ReorderBufferChange *change)
> +{

> +     /* Avoid leaking memory by using and resetting our own context */
> +     old = MemoryContextSwitchTo(data->context);
> +
> +     /*
> +      * Write the relation schema if the current schema haven't been sent 
> yet.
> +      */
> +     if (!relentry->schema_sent)
> +     {
> +             OutputPluginPrepareWrite(ctx, false);
> +             logicalrep_write_rel(ctx->out, relation);
> +             OutputPluginWrite(ctx, false);
> +             relentry->schema_sent = true;
> +     }
> +
> +     /* Send the data */
> +     switch (change->action)
> +     {
...
> +     /* Cleanup */
> +     MemoryContextSwitchTo(old);
> +     MemoryContextReset(data->context);
> +}

IIRC there were some pfree's in called functions. It's probably better
to remove those and rely on this.

> +/*
> + * Load publications from the list of publication names.
> + */
> +static List *
> +LoadPublications(List *pubnames)
> +{
> +     List       *result = NIL;
> +     ListCell   *lc;
> +
> +     foreach (lc, pubnames)
> +     {
> +             char               *pubname = (char *) lfirst(lc);
> +             Publication        *pub = GetPublicationByName(pubname, false);
> +
> +             result = lappend(result, pub);
> +     }
> +
> +     return result;
> +}

Why are we doing this eagerly? On systems with a lot of relations
this'll suck up a fair amount of memory, without much need?

> +/*
> + * Remove all the entries from our relation cache.
> + */
> +static void
> +destroy_rel_sync_cache(void)
> +{
> +     HASH_SEQ_STATUS         status;
> +     RelationSyncEntry  *entry;
> +
> +     if (RelationSyncCache == NULL)
> +             return;
> +
> +     hash_seq_init(&status, RelationSyncCache);
> +
> +     while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
> +     {
> +             if (hash_search(RelationSyncCache, (void *) &entry->relid,
> +                                             HASH_REMOVE, NULL) == NULL)
> +                     elog(ERROR, "hash table corrupted");
> +     }
> +
> +     RelationSyncCache = NULL;
> +}

Any reason not to just destroy the hash table instead?





> +enum {
> +     PARAM_UNRECOGNISED,
> +     PARAM_PROTOCOL_VERSION,
> +     PARAM_ENCODING,
> +     PARAM_PG_VERSION,
> +     PARAM_PUBLICATION_NAMES,
> +} OutputPluginParamKey;
> +
> +typedef struct {
> +     const char * const paramname;
> +     int     paramkey;
> +} OutputPluginParam;
> +
> +/* Oh, if only C had switch on strings */
> +static OutputPluginParam param_lookup[] = {
> +     {"proto_version", PARAM_PROTOCOL_VERSION},
> +     {"encoding", PARAM_ENCODING},
> +     {"pg_version", PARAM_PG_VERSION},
> +     {"publication_names", PARAM_PUBLICATION_NAMES},
> +     {NULL, PARAM_UNRECOGNISED}
> +};
> +
> +
> +/*
> + * Read parameters sent by client at startup and store recognised
> + * ones in the parameters PGOutputData.
> + *
> + * The data must have all client-supplied parameter fields zeroed,
> + * such as by memset or palloc0, since values not supplied
> + * by the client are not set.
> + */
> +void
> +pgoutput_process_parameters(List *options, PGOutputData *data)
> +{
> +     ListCell        *lc;
> +
> +     /* Examine all the other params in the message. */
> +     foreach(lc, options)
> +     {
> +             DefElem    *elem = lfirst(lc);
> +             Datum           val;
> +
> +             Assert(elem->arg == NULL || IsA(elem->arg, String));
> +
> +             /* Check each param, whether or not we recognise it */
> +             switch(get_param_key(elem->defname))
> +             {
> +                     case PARAM_PROTOCOL_VERSION:
> +                             val = get_param_value(elem, 
> OUTPUT_PARAM_TYPE_UINT32, false);
> +                             data->protocol_version = DatumGetUInt32(val);
> +                             break;
> +
> +                     case PARAM_ENCODING:
> +                             val = get_param_value(elem, 
> OUTPUT_PARAM_TYPE_STRING, false);
> +                             data->client_encoding = DatumGetCString(val);
> +                             break;
> +
> +                     case PARAM_PG_VERSION:
> +                             val = get_param_value(elem, 
> OUTPUT_PARAM_TYPE_UINT32, false);
> +                             data->client_pg_version = DatumGetUInt32(val);
> +                             break;
> +
> +                     case PARAM_PUBLICATION_NAMES:
> +                             val = get_param_value(elem, 
> OUTPUT_PARAM_TYPE_STRING, false);
> +                             if 
> (!SplitIdentifierString(DatumGetCString(val), ',',
> +                                                                             
>    &data->publication_names))
> +                                     ereport(ERROR,
> +                                                     
> (errcode(ERRCODE_INVALID_NAME),
> +                                                      errmsg("invalid 
> publication name syntax")));
> +
> +                             break;
> +
> +                     default:
> +                             ereport(ERROR,
> +                                             (errmsg("Unrecognised pgoutput 
> parameter %s",
> +                                                             
> elem->defname)));
> +                             break;
> +             }
> +     }
> +}
> +
> +/*
> + * Look up a param name to find the enum value for the
> + * param, or PARAM_UNRECOGNISED if not found.
> + */
> +static int
> +get_param_key(const char * const param_name)
> +{
> +     OutputPluginParam *param = &param_lookup[0];
> +
> +     do {
> +             if (strcmp(param->paramname, param_name) == 0)
> +                     return param->paramkey;
> +             param++;
> +     } while (param->paramname != NULL);
> +
> +     return PARAM_UNRECOGNISED;
> +}

I'm not following why this isn't just one routine with a chain of
else if (strmcp() == 0)
blocks?


> From 2241471aec03de553126c2d5fc012fcba1ecf50d Mon Sep 17 00:00:00 2001
> From: Petr Jelinek <pjmo...@pjmodos.net>
> Date: Wed, 6 Jul 2016 13:59:23 +0200
> Subject: [PATCH 4/6] Make libpqwalreceiver reentrant
> 
> ---
>  .../libpqwalreceiver/libpqwalreceiver.c            | 328 
> ++++++++++++++-------
>  src/backend/replication/walreceiver.c              |  67 +++--
>  src/include/replication/walreceiver.h              |  75 +++--
>  3 files changed, 306 insertions(+), 164 deletions(-)
> 
> diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c 
> b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> index f1c843e..5da4474 100644
> --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
> @@ -25,6 +25,7 @@
>  #include "miscadmin.h"
>  #include "replication/walreceiver.h"
>  #include "utils/builtins.h"
> +#include "utils/pg_lsn.h"
>  
>  #ifdef HAVE_POLL_H
>  #include <poll.h>
> @@ -38,62 +39,83 @@
>  
>  PG_MODULE_MAGIC;
>  
> -void         _PG_init(void);
> +struct WalReceiverConnHandle {
> +     /* Current connection to the primary, if any */
> +     PGconn *streamConn;
> +     /* Buffer for currently read records */
> +     char   *recvBuf;
> +};

newline before {

> -/* Current connection to the primary, if any */
> -static PGconn *streamConn = NULL;
> -
> -/* Buffer for currently read records */
> -static char *recvBuf = NULL;

Yuck, this indeed seems better.

>  
>  /*
> - * Module load callback
> + * Module initialization callback
>   */
> -void
> -_PG_init(void)
> +WalReceiverConnHandle *
> +_PG_walreceirver_conn_init(WalReceiverConnAPI *wrcapi)
>  {
> -     /* Tell walreceiver how to reach us */
> -     if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
> -             walrcv_readtimelinehistoryfile != NULL ||
> -             walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
> -             walrcv_receive != NULL || walrcv_send != NULL ||
> -             walrcv_disconnect != NULL)
> -             elog(ERROR, "libpqwalreceiver already loaded");
> -     walrcv_connect = libpqrcv_connect;
> -     walrcv_get_conninfo = libpqrcv_get_conninfo;
> -     walrcv_identify_system = libpqrcv_identify_system;
> -     walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
> -     walrcv_startstreaming = libpqrcv_startstreaming;
> -     walrcv_endstreaming = libpqrcv_endstreaming;
> -     walrcv_receive = libpqrcv_receive;
> -     walrcv_send = libpqrcv_send;
> -     walrcv_disconnect = libpqrcv_disconnect;
> +     WalReceiverConnHandle *handle;
> +
> +     handle = palloc0(sizeof(WalReceiverConnHandle));
> +
> +     /* Tell caller how to reach us */
> +     wrcapi->connect = libpqrcv_connect;
> +     wrcapi->get_conninfo = libpqrcv_get_conninfo;
> +     wrcapi->identify_system = libpqrcv_identify_system;
> +     wrcapi->readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
> +     wrcapi->create_slot = libpqrcv_create_slot;
> +     wrcapi->startstreaming_physical = libpqrcv_startstreaming_physical;
> +     wrcapi->startstreaming_logical = libpqrcv_startstreaming_logical;
> +     wrcapi->endstreaming = libpqrcv_endstreaming;
> +     wrcapi->receive = libpqrcv_receive;
> +     wrcapi->send = libpqrcv_send;
> +     wrcapi->disconnect = libpqrcv_disconnect;
> +
> +     return handle;
>  }

This however I'm not following. Why do we need multiple copies of this?
And why aren't we doing the assignments in _PG_init?  Seems better to
just allocate one WalRcvCalllbacks globally and assign all these as
constants.  Then the establishment function can just return all these
(as part of a bigger struct).


(skipped logical rep docs)

> diff --git a/doc/src/sgml/reference.sgml b/doc/src/sgml/reference.sgml
> index 8acdff1..34007d3 100644
> --- a/doc/src/sgml/reference.sgml
> +++ b/doc/src/sgml/reference.sgml
> @@ -54,11 +54,13 @@
>     &alterOperatorClass;
>     &alterOperatorFamily;
>     &alterPolicy;
> +   &alterPublication;
>     &alterRole;
>     &alterRule;
>     &alterSchema;
>     &alterSequence;
>     &alterServer;
> +   &alterSubscription;
>     &alterSystem;
>     &alterTable;
>     &alterTableSpace;
> @@ -100,11 +102,13 @@
>     &createOperatorClass;
>     &createOperatorFamily;
>     &createPolicy;
> +   &createPublication;
>     &createRole;
>     &createRule;
>     &createSchema;
>     &createSequence;
>     &createServer;
> +   &createSubscription;
>     &createTable;
>     &createTableAs;
>     &createTableSpace;
> @@ -144,11 +148,13 @@
>     &dropOperatorFamily;
>     &dropOwned;
>     &dropPolicy;
> +   &dropPublication;
>     &dropRole;
>     &dropRule;
>     &dropSchema;
>     &dropSequence;
>     &dropServer;
> +   &dropSubscription;
>     &dropTable;
>     &dropTableSpace;
>     &dropTSConfig;

Hm, shouldn't all these have been registered in the earlier patch?



> diff --git a/src/backend/commands/subscriptioncmds.c 
> b/src/backend/commands/subscriptioncmds.c
> index d29d3f9..f2052b8 100644
> --- a/src/backend/commands/subscriptioncmds.c
> +++ b/src/backend/commands/subscriptioncmds.c

This sure is a lot of yanking around of previously added code.  At least
some of it looks like it should really have been part of the earlier
commit.


> @@ -327,6 +431,18 @@ DropSubscriptionById(Oid subid)
>  {
>       Relation        rel;
>       HeapTuple       tup;
> +     Datum           datum;
> +     bool            isnull;
> +     char       *subname;
> +     char       *conninfo;
> +     char       *slotname;
> +     RepOriginId     originid;
> +     MemoryContext                   tmpctx,
> +                                                     oldctx;
> +     WalReceiverConnHandle  *wrchandle = NULL;
> +     WalReceiverConnAPI         *wrcapi = NULL;
> +     walrcvconn_init_fn              walrcvconn_init;
> +     LogicalRepWorker           *worker;
>  
>       check_subscription_permissions();
>  
> @@ -337,9 +453,135 @@ DropSubscriptionById(Oid subid)
>       if (!HeapTupleIsValid(tup))
>               elog(ERROR, "cache lookup failed for subscription %u", subid);
>  
> +     /*
> +      * Create temporary memory context to keep copy of subscription
> +      * info needed later in the execution.
> +      */
> +     tmpctx = AllocSetContextCreate(TopMemoryContext,
> +                                                                             
>   "DropSubscription Ctx",
> +                                                                             
>   ALLOCSET_DEFAULT_MINSIZE,
> +                                                                             
>   ALLOCSET_DEFAULT_INITSIZE,
> +                                                                             
>   ALLOCSET_DEFAULT_MAXSIZE);
> +     oldctx = MemoryContextSwitchTo(tmpctx);
> +
> +     /* Get subname */
> +     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
> +                                                     
> Anum_pg_subscription_subname, &isnull);
> +     Assert(!isnull);
> +     subname = pstrdup(NameStr(*DatumGetName(datum)));
> +
> +     /* Get conninfo */
> +     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
> +                                                     
> Anum_pg_subscription_subconninfo, &isnull);
> +     Assert(!isnull);
> +     conninfo = pstrdup(TextDatumGetCString(datum));
> +
> +     /* Get slotname */
> +     datum = SysCacheGetAttr(SUBSCRIPTIONOID, tup,
> +                                                     
> Anum_pg_subscription_subslotname, &isnull);
> +     Assert(!isnull);
> +     slotname = pstrdup(NameStr(*DatumGetName(datum)));
> +
> +     MemoryContextSwitchTo(oldctx);
> +
> +     /* Remove the tuple from catalog. */
>       simple_heap_delete(rel, &tup->t_self);
>  
> -     ReleaseSysCache(tup);
> +     /* Protect against launcher restarting the worker. */
> +     LWLockAcquire(LogicalRepLauncherLock, LW_EXCLUSIVE);
>  
> -     heap_close(rel, RowExclusiveLock);
> +     /* Kill the apply worker so that the slot becomes accessible. */
> +     LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +     worker = logicalrep_worker_find(subid);
> +     if (worker)
> +             logicalrep_worker_stop(worker);
> +     LWLockRelease(LogicalRepWorkerLock);
> +
> +     /* Wait for apply process to die. */
> +     for (;;)
> +     {
> +             int     rc;
> +
> +             CHECK_FOR_INTERRUPTS();
> +
> +             LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
> +             if (logicalrep_worker_count(subid) < 1)
> +             {
> +                     LWLockRelease(LogicalRepWorkerLock);
> +                     break;
> +             }
> +             LWLockRelease(LogicalRepWorkerLock);
> +
> +             /* Wait for more work. */
> +             rc = WaitLatch(&MyProc->procLatch,
> +                                        WL_LATCH_SET | WL_TIMEOUT | 
> WL_POSTMASTER_DEATH,
> +                                        1000L);
> +
> +             /* emergency bailout if postmaster has died */
> +             if (rc & WL_POSTMASTER_DEATH)
> +                     proc_exit(1);
> +
> +             ResetLatch(&MyProc->procLatch);
> +     }

I'm really far from convinced this is the right layer to perform these
operations.  Previously these routines were low level catalog
manipulation routines. Now they're certainly not.


> +     /* Remove the origin trakicking. */

typo.



> +     /*
> +      * Now that the catalog update is done, try to reserve slot at the
> +      * provider node using replication connection.
> +      */
> +     wrcapi = palloc0(sizeof(WalReceiverConnAPI));
> +
> +     walrcvconn_init = (walrcvconn_init_fn)
> +             load_external_function("libpqwalreceiver",
> +                                                        
> "_PG_walreceirver_conn_init", false, NULL);
> +
> +     if (walrcvconn_init == NULL)
> +             elog(ERROR, "libpqwalreceiver does not declare 
> _PG_walreceirver_conn_init symbol");

This does rather reinforce my opinion that the _PG_init removal in
libpqwalreceiver isn't useful.

> diff --git a/src/backend/postmaster/bgworker.c 
> b/src/backend/postmaster/bgworker.c
> index 699c934..fc998cd 100644
> --- a/src/backend/postmaster/bgworker.c
> +++ b/src/backend/postmaster/bgworker.c
> @@ -93,6 +93,9 @@ struct BackgroundWorkerHandle
>  
>  static BackgroundWorkerArray *BackgroundWorkerData;
>  
> +/* Enables registration of internal background workers. */
> +bool internal_bgworker_registration_in_progress = false;
> +
>  /*
>   * Calculate shared memory needed.
>   */
> @@ -745,7 +748,8 @@ RegisterBackgroundWorker(BackgroundWorker *worker)
>               ereport(DEBUG1,
>                (errmsg("registering background worker \"%s\"", 
> worker->bgw_name)));
>  
> -     if (!process_shared_preload_libraries_in_progress)
> +     if (!process_shared_preload_libraries_in_progress &&
> +             !internal_bgworker_registration_in_progress)
>       {
>               if (!IsUnderPostmaster)
>                       ereport(LOG,

Ugh.




>  /*
> + * Register internal background workers.
> + *
> + * This is here mainly because the permanent bgworkers are normally allowed
> + * to be registered only when share preload libraries are loaded which does
> + * not work for the internal ones.
> + */
> +static void
> +register_internal_bgworkers(void)
> +{
> +     internal_bgworker_registration_in_progress = true;
> +
> +     /* Register the logical replication worker launcher if appropriate. */
> +     if (!IsBinaryUpgrade && max_logical_replication_workers > 0)
> +     {
> +             BackgroundWorker bgw;
> +
> +             bgw.bgw_flags = BGWORKER_SHMEM_ACCESS |
> +                     BGWORKER_BACKEND_DATABASE_CONNECTION;
> +             bgw.bgw_start_time = BgWorkerStart_RecoveryFinished;
> +             bgw.bgw_main = ApplyLauncherMain;
> +             snprintf(bgw.bgw_name, BGW_MAXLEN,
> +                              "logical replication launcher");
> +             bgw.bgw_restart_time = 5;
> +             bgw.bgw_notify_pid = 0;
> +             bgw.bgw_main_arg = (Datum) 0;
> +
> +             RegisterBackgroundWorker(&bgw);
> +     }
> +
> +     internal_bgworker_registration_in_progress = false;
> +}

Who says these flags are right for everyone?  If we indeed want to go
through bgworkers here, I think you'll have to generallize this a bit,
so we don't check for max_logical_replication_workers and such here.  We
could e.g. have the shared memory sizing hooks set up a chain of
registrations.



> -static void
> +static char *
>  libpqrcv_identify_system(WalReceiverConnHandle *handle,
> -                                              TimeLineID *primary_tli)
> +                                              TimeLineID *primary_tli,
> +                                              char **dbname)
>  {
> +     char       *sysid;
>       PGresult   *res;
> -     char       *primary_sysid;
> -     char            standby_sysid[32];
>  
>       /*
>        * Get the system identifier and timeline ID as a DataRow message from 
> the
> @@ -231,24 +234,19 @@ libpqrcv_identify_system(WalReceiverConnHandle *handle,
>                                errdetail("Could not identify system: got %d 
> rows and %d fields, expected %d rows and %d or more fields.",
>                                                  ntuples, nfields, 3, 1)));
>       }
> -     primary_sysid = PQgetvalue(res, 0, 0);
> +     sysid = pstrdup(PQgetvalue(res, 0, 0));
>       *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
> -
> -     /*
> -      * Confirm that the system identifier of the primary is the same as 
> ours.
> -      */
> -     snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
> -                      GetSystemIdentifier());
> -     if (strcmp(primary_sysid, standby_sysid) != 0)
> +     if (dbname)
>       {
> -             primary_sysid = pstrdup(primary_sysid);
> -             PQclear(res);
> -             ereport(ERROR,
> -                             (errmsg("database system identifier differs 
> between the primary and standby"),
> -                              errdetail("The primary's identifier is %s, the 
> standby's identifier is %s.",
> -                                                primary_sysid, 
> standby_sysid)));
> +             if (PQgetisnull(res, 0, 3))
> +                     *dbname = NULL;
> +             else
> +                     *dbname = pstrdup(PQgetvalue(res, 0, 3));
>       }
> +
>       PQclear(res);
> +
> +     return sysid;
>  }
>  
>  /*
> @@ -274,7 +272,7 @@ libpqrcv_create_slot(WalReceiverConnHandle *handle, char 
> *slotname,
>  
>       if (PQresultStatus(res) != PGRES_TUPLES_OK)
>       {
> -             elog(FATAL, "could not crate replication slot \"%s\": %s\n",
> +             elog(ERROR, "could not crate replication slot \"%s\": %s\n",
>                        slotname, PQerrorMessage(handle->streamConn));
>       }
>  
> @@ -287,6 +285,28 @@ libpqrcv_create_slot(WalReceiverConnHandle *handle, char 
> *slotname,
>       return snapshot;
>  }
>  
> +/*
> + * Drop replication slot.
> + */
> +static void
> +libpqrcv_drop_slot(WalReceiverConnHandle *handle, char *slotname)
> +{
> +     PGresult           *res;
> +     char                    cmd[256];
> +
> +     snprintf(cmd, sizeof(cmd),
> +                      "DROP_REPLICATION_SLOT \"%s\"", slotname);
> +
> +     res = libpqrcv_PQexec(handle, cmd);
> +
> +     if (PQresultStatus(res) != PGRES_COMMAND_OK)
> +     {
> +             elog(ERROR, "could not drop replication slot \"%s\": %s\n",
> +                      slotname, PQerrorMessage(handle->streamConn));
> +     }
> +
> +     PQclear(res);
> +}


Given that the earlier commit to libpqwalreciever added a lot of this
information, it doesn't seem right to change it again here.



> +typedef struct LogicalRepRelMapEntry {

early {


Ok, running out of time. See you soon I guess ;)

Andres


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to