Hi I took the latest posted patch, rebased on current sources, fixed the conflicts, and pgindented. No further changes. Here's the result. All tests are passing for me. Some review comments that were posted have not been addressed yet; I'll look into that soon.
-- Álvaro Herrera PostgreSQL Developer — https://www.EnterpriseDB.com/ "Java is clearly an example of money oriented programming" (A. Stepanov)
>From bb01d00a9ee8f19bc1d9c36bde6cd0ca178c859c Mon Sep 17 00:00:00 2001 From: Alvaro Herrera <alvhe...@alvh.no-ip.org> Date: Mon, 6 Sep 2021 10:34:29 -0300 Subject: [PATCH v6] Add column filtering to logical replication Add capability to specifiy column names while linking the table to a publication, at the time of CREATE or ALTER publication. This will allow replicating only the specified columns. Other columns, if any, on the subscriber will be populated locally or NULL will be inserted if no value is supplied for the column by the upstream during INSERT. This facilitates replication to a table on subscriber containing only the subscribed/filtered columns. If no filter is specified, all the columns are replicated. REPLICA IDENTITY columns are always replicated. Thus, prohibit adding relation to publication, if column filters do not contain REPLICA IDENTITY. Add a tap test for the same in src/test/subscription. Author: Rahila Syed <rahilasye...@gmail.com> Discussion: https://postgr.es/m/CAH2L28vddB_NFdRVpuyRBJEBWjz4BSyTB=_ektnrh8nj1jf...@mail.gmail.com --- src/backend/access/common/relation.c | 22 +++++ src/backend/catalog/pg_publication.c | 58 ++++++++++- src/backend/commands/publicationcmds.c | 8 +- src/backend/nodes/copyfuncs.c | 1 + src/backend/nodes/equalfuncs.c | 1 + src/backend/parser/gram.y | 30 +++++- src/backend/replication/logical/proto.c | 103 ++++++++++++++++---- src/backend/replication/logical/tablesync.c | 101 +++++++++++++++++-- src/backend/replication/pgoutput/pgoutput.c | 77 ++++++++++++--- src/include/catalog/pg_publication.h | 1 + src/include/catalog/pg_publication_rel.h | 4 + src/include/nodes/parsenodes.h | 1 + src/include/replication/logicalproto.h | 6 +- src/include/utils/rel.h | 1 + 14 files changed, 365 insertions(+), 49 deletions(-) diff --git a/src/backend/access/common/relation.c b/src/backend/access/common/relation.c index 632d13c1ea..05d6fcba26 100644 --- a/src/backend/access/common/relation.c +++ b/src/backend/access/common/relation.c @@ -21,12 +21,14 @@ #include "postgres.h" #include "access/relation.h" +#include "access/sysattr.h" #include "access/xact.h" #include "catalog/namespace.h" #include "miscadmin.h" #include "pgstat.h" #include "storage/lmgr.h" #include "utils/inval.h" +#include "utils/lsyscache.h" #include "utils/syscache.h" @@ -215,3 +217,23 @@ relation_close(Relation relation, LOCKMODE lockmode) if (lockmode != NoLock) UnlockRelationId(&relid, lockmode); } + +/* + * Return a bitmapset of attributes given the list of column names + */ +Bitmapset * +get_table_columnset(Oid relid, List *columns, Bitmapset *att_map) +{ + ListCell *cell; + + foreach(cell, columns) + { + const char *attname = lfirst(cell); + int attnum = get_attnum(relid, attname); + + if (!bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, att_map)) + att_map = bms_add_member(att_map, + attnum - FirstLowInvalidHeapAttributeNumber); + } + return att_map; +} diff --git a/src/backend/catalog/pg_publication.c b/src/backend/catalog/pg_publication.c index 63579b2f82..de5f3266cd 100644 --- a/src/backend/catalog/pg_publication.c +++ b/src/backend/catalog/pg_publication.c @@ -50,8 +50,12 @@ * error if not. */ static void -check_publication_add_relation(Relation targetrel) +check_publication_add_relation(Relation targetrel, List *targetcols) { + bool replidentfull = (targetrel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + Oid relid = RelationGetRelid(targetrel); + Bitmapset *idattrs; + /* Must be a regular or partitioned table */ if (RelationGetForm(targetrel)->relkind != RELKIND_RELATION && RelationGetForm(targetrel)->relkind != RELKIND_PARTITIONED_TABLE) @@ -82,6 +86,36 @@ check_publication_add_relation(Relation targetrel) errmsg("cannot add relation \"%s\" to publication", RelationGetRelationName(targetrel)), errdetail("This operation is not supported for unlogged tables."))); + + /* + * Cannot specify column filter when REPLICA IDENTITY IS FULL or if column + * filter does not contain REPLICA IDENITY columns + */ + if (targetcols != NIL) + { + if (replidentfull) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s\" to publication", + RelationGetRelationName(targetrel)), + errdetail("Cannot have column filter with REPLICA IDENTITY FULL"))); + else + { + Bitmapset *filtermap = NULL; + + idattrs = RelationGetIndexAttrBitmap(targetrel, INDEX_ATTR_BITMAP_IDENTITY_KEY); + + filtermap = get_table_columnset(relid, targetcols, filtermap); + if (!bms_is_subset(idattrs, filtermap)) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("cannot add relation \"%s\" to publication", + RelationGetRelationName(targetrel)), + errdetail("Column filter must include REPLICA IDENTITY columns"))); + } + } + } } /* @@ -270,6 +304,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ObjectAddress myself, referenced; List *relids = NIL; + ListCell *lc; + List *target_cols = NIL; rel = table_open(PublicationRelRelationId, RowExclusiveLock); @@ -292,7 +328,14 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, RelationGetRelationName(targetrel->relation), pub->name))); } - check_publication_add_relation(targetrel->relation); + foreach(lc, targetrel->columns) + { + char *colname; + + colname = strVal(lfirst(lc)); + target_cols = lappend(target_cols, colname); + } + check_publication_add_relation(targetrel->relation, target_cols); /* Form a tuple. */ memset(values, 0, sizeof(values)); @@ -305,6 +348,8 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, ObjectIdGetDatum(pubid); values[Anum_pg_publication_rel_prrelid - 1] = ObjectIdGetDatum(relid); + values[Anum_pg_publication_rel_prattrs - 1] = + PointerGetDatum(strlist_to_textarray(target_cols)); tup = heap_form_tuple(RelationGetDescr(rel), values, nulls); @@ -313,7 +358,16 @@ publication_add_relation(Oid pubid, PublicationRelInfo *targetrel, heap_freetuple(tup); ObjectAddressSet(myself, PublicationRelRelationId, prrelid); + foreach(lc, target_cols) + { + int attnum; + attnum = get_attnum(relid, lfirst(lc)); + + /* Add dependency on the column */ + ObjectAddressSubSet(referenced, RelationRelationId, relid, attnum); + recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); + } /* Add dependency on the publication */ ObjectAddressSet(referenced, PublicationRelationId, pubid); recordDependencyOn(&myself, &referenced, DEPENDENCY_AUTO); diff --git a/src/backend/commands/publicationcmds.c b/src/backend/commands/publicationcmds.c index 7d4a0e95f6..2b06deed6b 100644 --- a/src/backend/commands/publicationcmds.c +++ b/src/backend/commands/publicationcmds.c @@ -561,7 +561,8 @@ AlterPublicationTables(AlterPublicationStmt *stmt, HeapTuple tup, pubrel = palloc(sizeof(PublicationRelInfo)); pubrel->relation = oldrel; - + /* This is not needed to delete a table */ + pubrel->columns = NIL; delrels = lappend(delrels, pubrel); } } @@ -932,6 +933,8 @@ OpenTableList(List *tables) pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->columns = NIL; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, myrelid); @@ -965,8 +968,11 @@ OpenTableList(List *tables) /* find_all_inheritors already got lock */ rel = table_open(childrelid, NoLock); + pub_rel = palloc(sizeof(PublicationRelInfo)); pub_rel->relation = rel; + pub_rel->columns = NIL; + rels = lappend(rels, pub_rel); relids = lappend_oid(relids, childrelid); } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index 297b6ee715..081b432d8f 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -4832,6 +4832,7 @@ _copyPublicationTable(const PublicationTable *from) PublicationTable *newnode = makeNode(PublicationTable); COPY_NODE_FIELD(relation); + COPY_NODE_FIELD(columns); return newnode; } diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c index f537d3eb96..f871e61f5f 100644 --- a/src/backend/nodes/equalfuncs.c +++ b/src/backend/nodes/equalfuncs.c @@ -2312,6 +2312,7 @@ static bool _equalPublicationTable(const PublicationTable *a, const PublicationTable *b) { COMPARE_NODE_FIELD(relation); + COMPARE_NODE_FIELD(columns); return true; } diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 86ce33bd97..695efd784d 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -9654,12 +9654,13 @@ CreatePublicationStmt: * relation_expr here. */ PublicationObjSpec: - TABLE relation_expr + TABLE relation_expr opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_TABLE; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $2; + $$->pubtable->columns = $3; } | ALL TABLES IN_P SCHEMA ColId { @@ -9674,28 +9675,37 @@ PublicationObjSpec: $$->pubobjtype = PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA; $$->location = @5; } - | ColId + | ColId opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; - $$->name = $1; + if ($2) + { + $$->pubtable = makeNode(PublicationTable); + $$->pubtable->relation = makeRangeVar(NULL, $1, @1); + $$->pubtable->columns = $2; + } + else + $$->name = $1; $$->location = @1; } - | ColId indirection + | ColId indirection opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = makeRangeVarFromQualifiedName($1, $2, @1, yyscanner); + $$->pubtable->columns = $3; $$->location = @1; } /* grammar like tablename * , ONLY tablename, ONLY ( tablename ) */ - | extended_relation_expr + | extended_relation_expr opt_column_list { $$ = makeNode(PublicationObjSpec); $$->pubobjtype = PUBLICATIONOBJ_CONTINUATION; $$->pubtable = makeNode(PublicationTable); $$->pubtable->relation = $1; + $$->pubtable->columns = $2; } | CURRENT_SCHEMA { @@ -17356,6 +17366,16 @@ preprocess_pubobj_list(List *pubobjspec_list, core_yyscan_t yyscanner) else if (pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_SCHEMA || pubobj->pubobjtype == PUBLICATIONOBJ_TABLE_IN_CUR_SCHEMA) { + /* + * This can happen if a column list is specified in a continuation + * for a schema entry; reject it. + */ + if (pubobj->pubtable) + ereport(ERROR, + errcode(ERRCODE_SYNTAX_ERROR), + errmsg("column specification not allowed for schemas"), + parser_errposition(pubobj->location)); + /* * We can distinguish between the different type of schema * objects based on whether name and pubtable is set. diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 9f5bf4b639..15d8192238 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -29,9 +29,9 @@ #define TRUNCATE_CASCADE (1<<0) #define TRUNCATE_RESTART_SEQS (1<<1) -static void logicalrep_write_attrs(StringInfo out, Relation rel); +static void logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map); static void logicalrep_write_tuple(StringInfo out, Relation rel, - HeapTuple tuple, bool binary); + HeapTuple tuple, bool binary, Bitmapset *att_map); static void logicalrep_read_attrs(StringInfo in, LogicalRepRelation *rel); static void logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple); @@ -398,7 +398,7 @@ logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn) */ void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, - HeapTuple newtuple, bool binary) + HeapTuple newtuple, bool binary, Bitmapset *att_map) { pq_sendbyte(out, LOGICAL_REP_MSG_INSERT); @@ -410,7 +410,7 @@ logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, pq_sendint32(out, RelationGetRelid(rel)); pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, att_map); } /* @@ -442,7 +442,7 @@ logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup) */ void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, - HeapTuple oldtuple, HeapTuple newtuple, bool binary) + HeapTuple oldtuple, HeapTuple newtuple, bool binary, Bitmapset *att_map) { pq_sendbyte(out, LOGICAL_REP_MSG_UPDATE); @@ -463,11 +463,11 @@ logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, pq_sendbyte(out, 'O'); /* old tuple follows */ else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, att_map); } pq_sendbyte(out, 'N'); /* new tuple follows */ - logicalrep_write_tuple(out, rel, newtuple, binary); + logicalrep_write_tuple(out, rel, newtuple, binary, att_map); } /* @@ -536,7 +536,7 @@ logicalrep_write_delete(StringInfo out, TransactionId xid, Relation rel, else pq_sendbyte(out, 'K'); /* old key follows */ - logicalrep_write_tuple(out, rel, oldtuple, binary); + logicalrep_write_tuple(out, rel, oldtuple, binary, NULL); } /* @@ -651,7 +651,7 @@ logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, * Write relation description to the output stream. */ void -logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) +logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel, Bitmapset *att_map) { char *relname; @@ -673,7 +673,7 @@ logicalrep_write_rel(StringInfo out, TransactionId xid, Relation rel) pq_sendbyte(out, rel->rd_rel->relreplident); /* send the attribute info */ - logicalrep_write_attrs(out, rel); + logicalrep_write_attrs(out, rel, att_map); } /* @@ -749,20 +749,42 @@ logicalrep_read_typ(StringInfo in, LogicalRepTyp *ltyp) * Write a tuple to the outputstream, in the most efficient format possible. */ static void -logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary) +logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binary, + Bitmapset *att_map) { TupleDesc desc; Datum values[MaxTupleAttributeNumber]; bool isnull[MaxTupleAttributeNumber]; int i; uint16 nliveatts = 0; + Bitmapset *idattrs = NULL; + bool replidentfull; + Form_pg_attribute att; desc = RelationGetDescr(rel); + replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); + if (!replidentfull) + idattrs = RelationGetIdentityKeyBitmap(rel); + for (i = 0; i < desc->natts; i++) { + att = TupleDescAttr(desc, i); if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) continue; + + /* + * Do not increment count of attributes if not a part of column + * filters except for replica identity columns or if replica identity + * is full. + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map) && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs) && + !replidentfull) + continue; nliveatts++; } pq_sendint16(out, nliveatts); @@ -800,6 +822,19 @@ logicalrep_write_tuple(StringInfo out, Relation rel, HeapTuple tuple, bool binar continue; } + /* + * Do not send attribute data if it is not a part of column filters, + * except if it is a part of REPLICA IDENTITY or REPLICA IDENTITY is + * full, send the data. + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map) && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs) && + !replidentfull) + continue; + typtup = SearchSysCache1(TYPEOID, ObjectIdGetDatum(att->atttypid)); if (!HeapTupleIsValid(typtup)) elog(ERROR, "cache lookup failed for type %u", att->atttypid); @@ -904,7 +939,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple) * Write relation attribute metadata to the stream. */ static void -logicalrep_write_attrs(StringInfo out, Relation rel) +logicalrep_write_attrs(StringInfo out, Relation rel, Bitmapset *att_map) { TupleDesc desc; int i; @@ -914,20 +949,35 @@ logicalrep_write_attrs(StringInfo out, Relation rel) desc = RelationGetDescr(rel); - /* send number of live attributes */ - for (i = 0; i < desc->natts; i++) - { - if (TupleDescAttr(desc, i)->attisdropped || TupleDescAttr(desc, i)->attgenerated) - continue; - nliveatts++; - } - pq_sendint16(out, nliveatts); - /* fetch bitmap of REPLICATION IDENTITY attributes */ replidentfull = (rel->rd_rel->relreplident == REPLICA_IDENTITY_FULL); if (!replidentfull) idattrs = RelationGetIdentityKeyBitmap(rel); + /* send number of live attributes */ + for (i = 0; i < desc->natts; i++) + { + Form_pg_attribute att = TupleDescAttr(desc, i); + + if (att->attisdropped || att->attgenerated) + continue; + /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ + if (replidentfull || + bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs)) + { + nliveatts++; + continue; + } + /* Skip sending if not a part of column filter */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map)) + continue; + nliveatts++; + } + pq_sendint16(out, nliveatts); + /* send the attributes */ for (i = 0; i < desc->natts; i++) { @@ -937,6 +987,17 @@ logicalrep_write_attrs(StringInfo out, Relation rel) if (att->attisdropped || att->attgenerated) continue; + /* + * Exclude filtered columns, but REPLICA IDENTITY columns can't be + * excluded + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map) && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + idattrs) + && !replidentfull) + continue; /* REPLICA IDENTITY FULL means all columns are sent as part of key. */ if (replidentfull || bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f07983a43c..953304b200 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -111,6 +111,7 @@ #include "replication/origin.h" #include "storage/ipc.h" #include "storage/lmgr.h" +#include "utils/array.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -695,19 +696,28 @@ fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel) { WalRcvExecResult *res; + WalRcvExecResult *res_pub; StringInfoData cmd; TupleTableSlot *slot; - Oid tableRow[] = {OIDOID, CHAROID, CHAROID}; + TupleTableSlot *slot_pub; + Oid tableRow[] = {OIDOID, CHAROID, CHAROID, BOOLOID}; Oid attrRow[] = {TEXTOID, OIDOID, BOOLOID}; + Oid pubRow[] = {TEXTARRAYOID}; bool isnull; - int natt; + int natt, + i; + Datum *elems; + int nelems; + List *pub_columns = NIL; + ListCell *lc; + bool am_partition = false; lrel->nspname = nspname; lrel->relname = relname; /* First fetch Oid and replica identity. */ initStringInfo(&cmd); - appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind" + appendStringInfo(&cmd, "SELECT c.oid, c.relreplident, c.relkind, c.relispartition" " FROM pg_catalog.pg_class c" " INNER JOIN pg_catalog.pg_namespace n" " ON (c.relnamespace = n.oid)" @@ -737,6 +747,7 @@ fetch_remote_table_info(char *nspname, char *relname, Assert(!isnull); lrel->relkind = DatumGetChar(slot_getattr(slot, 3, &isnull)); Assert(!isnull); + am_partition = DatumGetChar(slot_getattr(slot, 4, &isnull)); ExecDropSingleTupleTableSlot(slot); walrcv_clear_result(res); @@ -774,11 +785,80 @@ fetch_remote_table_info(char *nspname, char *relname, natt = 0; slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); + + /* + * Now, fetch the values of publications' column filters For a partition, + * use pg_inherit to find the parent, as the pg_publication_rel contains + * only the topmost parent table entry in case the table is partitioned. + * Run a recursive query to iterate through all the parents of the + * partition and retreive the record for the parent that exists in + * pg_publication_rel. + */ + resetStringInfo(&cmd); + if (!am_partition) + appendStringInfo(&cmd, "SELECT prattrs from pg_publication_rel" + " WHERE prrelid = %u", lrel->remoteid); + else + appendStringInfo(&cmd, "WITH RECURSIVE t(inhparent) AS ( SELECT inhparent from pg_inherits where inhrelid = %u" + " UNION SELECT pg.inhparent from pg_inherits pg, t where inhrelid = t.inhparent)" + " SELECT prattrs from pg_publication_rel WHERE prrelid IN (SELECT inhparent from t)", lrel->remoteid); + + res_pub = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, + lengthof(pubRow), pubRow); + + if (res_pub->status != WALRCV_OK_TUPLES) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("could not fetch published columns info for table \"%s.%s\" from publisher: %s", + nspname, relname, res_pub->err))); + slot_pub = MakeSingleTupleTableSlot(res_pub->tupledesc, &TTSOpsMinimalTuple); + + while (tuplestore_gettupleslot(res_pub->tuplestore, true, false, slot_pub)) + { + deconstruct_array(DatumGetArrayTypePCopy(slot_getattr(slot_pub, 1, &isnull)), + TEXTOID, -1, false, 'i', + &elems, NULL, &nelems); + for (i = 0; i < nelems; i++) + pub_columns = lappend(pub_columns, TextDatumGetCString(elems[i])); + ExecClearTuple(slot_pub); + } + ExecDropSingleTupleTableSlot(slot_pub); + walrcv_clear_result(res_pub); + + /* + * Store the column names only if they are contained in column filter + * LogicalRepRelation will only contain attributes corresponding to those + * specficied in column filters. + */ while (tuplestore_gettupleslot(res->tuplestore, true, false, slot)) { - lrel->attnames[natt] = - TextDatumGetCString(slot_getattr(slot, 1, &isnull)); + char *rel_colname; + bool found = false; + + rel_colname = TextDatumGetCString(slot_getattr(slot, 1, &isnull)); Assert(!isnull); + if (pub_columns != NIL) + { + foreach(lc, pub_columns) + { + char *pub_colname = lfirst(lc); + + if (!strcmp(pub_colname, rel_colname)) + { + found = true; + lrel->attnames[natt] = rel_colname; + break; + } + } + } + else + { + found = true; + lrel->attnames[natt] = rel_colname; + } + if (!found) + continue; + lrel->atttyps[natt] = DatumGetObjectId(slot_getattr(slot, 2, &isnull)); Assert(!isnull); if (DatumGetBool(slot_getattr(slot, 3, &isnull))) @@ -829,8 +909,17 @@ copy_table(Relation rel) /* Start copy on the publisher. */ initStringInfo(&cmd); if (lrel.relkind == RELKIND_RELATION) - appendStringInfo(&cmd, "COPY %s TO STDOUT", + { + appendStringInfo(&cmd, "COPY %s (", quote_qualified_identifier(lrel.nspname, lrel.relname)); + for (int i = 0; i < lrel.natts; i++) + { + appendStringInfoString(&cmd, quote_identifier(lrel.attnames[i])); + if (i < lrel.natts - 1) + appendStringInfoString(&cmd, ", "); + } + appendStringInfo(&cmd, ") TO STDOUT"); + } else { /* diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 6f6a203dea..e33cb29cb3 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -15,16 +15,19 @@ #include "access/tupconvert.h" #include "catalog/partition.h" #include "catalog/pg_publication.h" +#include "catalog/pg_publication_rel_d.h" #include "commands/defrem.h" #include "fmgr.h" #include "replication/logical.h" #include "replication/logicalproto.h" #include "replication/origin.h" #include "replication/pgoutput.h" +#include "utils/builtins.h" #include "utils/int8.h" #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/rel.h" #include "utils/syscache.h" #include "utils/varlena.h" @@ -81,7 +84,8 @@ static List *LoadPublications(List *pubnames); static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx); + LogicalDecodingContext *ctx, + Bitmapset *att_map); static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); @@ -130,6 +134,7 @@ typedef struct RelationSyncEntry * having identical TupleDesc. */ TupleConversionMap *map; + Bitmapset *att_map; } RelationSyncEntry; /* Map used to remember which relation schemas we sent. */ @@ -570,11 +575,11 @@ maybe_send_schema(LogicalDecodingContext *ctx, } MemoryContextSwitchTo(oldctx); - send_relation_and_attrs(ancestor, xid, ctx); + send_relation_and_attrs(ancestor, xid, ctx, relentry->att_map); RelationClose(ancestor); } - send_relation_and_attrs(relation, xid, ctx); + send_relation_and_attrs(relation, xid, ctx, relentry->att_map); if (in_streaming) set_schema_sent_in_streamed_txn(relentry, topxid); @@ -587,7 +592,8 @@ maybe_send_schema(LogicalDecodingContext *ctx, */ static void send_relation_and_attrs(Relation relation, TransactionId xid, - LogicalDecodingContext *ctx) + LogicalDecodingContext *ctx, + Bitmapset *att_map) { TupleDesc desc = RelationGetDescr(relation); int i; @@ -610,13 +616,23 @@ send_relation_and_attrs(Relation relation, TransactionId xid, if (att->atttypid < FirstGenbkiObjectId) continue; + /* + * Do not send type information if attribute is not present in column + * filter. XXX Allow sending type information for REPLICA IDENTITY + * COLUMNS with user created type. even when they are not mentioned in + * column filters. + */ + if (att_map != NULL && + !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber, + att_map)) + continue; OutputPluginPrepareWrite(ctx, false); logicalrep_write_typ(ctx->out, xid, att->atttypid); OutputPluginWrite(ctx, false); } OutputPluginPrepareWrite(ctx, false); - logicalrep_write_rel(ctx->out, xid, relation); + logicalrep_write_rel(ctx->out, xid, relation, att_map); OutputPluginWrite(ctx, false); } @@ -693,7 +709,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_insert(ctx->out, xid, relation, tuple, - data->binary); + data->binary, relentry->att_map); OutputPluginWrite(ctx, true); break; } @@ -722,7 +738,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginPrepareWrite(ctx, true); logicalrep_write_update(ctx->out, xid, relation, oldtuple, - newtuple, data->binary); + newtuple, data->binary, relentry->att_map); OutputPluginWrite(ctx, true); break; } @@ -1122,6 +1138,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) bool am_partition = get_rel_relispartition(relid); char relkind = get_rel_relkind(relid); bool found; + Oid ancestor_id; MemoryContext oldctx; Assert(RelationSyncCache != NULL); @@ -1142,6 +1159,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) entry->pubactions.pubinsert = entry->pubactions.pubupdate = entry->pubactions.pubdelete = entry->pubactions.pubtruncate = false; entry->publish_as_relid = InvalidOid; + entry->att_map = NULL; entry->map = NULL; /* will be set by maybe_send_schema() if * needed */ } @@ -1182,6 +1200,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) { Publication *pub = lfirst(lc); bool publish = false; + bool ancestor_published = false; if (pub->alltables) { @@ -1192,7 +1211,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (!publish) { - bool ancestor_published = false; /* * For a partition, check if any of the ancestors are @@ -1219,6 +1237,7 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) pub->oid)) { ancestor_published = true; + ancestor_id = ancestor; if (pub->pubviaroot) publish_as_relid = ancestor; } @@ -1239,15 +1258,49 @@ get_rel_sync_entry(PGOutputData *data, Oid relid) if (publish && (relkind != RELKIND_PARTITIONED_TABLE || pub->pubviaroot)) { + int nelems, + i; + bool isnull; + Datum *elems; + HeapTuple pub_rel_tuple; + Datum pub_rel_cols; + List *columns = NIL; + + pub_rel_tuple = + SearchSysCache2(PUBLICATIONRELMAP, + ancestor_published ? ObjectIdGetDatum(ancestor_id) : + ObjectIdGetDatum(publish_as_relid), + ObjectIdGetDatum(pub->oid)); + + if (HeapTupleIsValid(pub_rel_tuple)) + { + pub_rel_cols = SysCacheGetAttr(PUBLICATIONRELMAP, + pub_rel_tuple, + Anum_pg_publication_rel_prattrs, + &isnull); + if (!isnull) + { + /* FIXME it's a bad idea to use CacheMemoryContext + * directly here. Must produce the map first in a + * tmp context, then copy to CacheMemoryContext + */ + oldctx = MemoryContextSwitchTo(CacheMemoryContext); + deconstruct_array(DatumGetArrayTypePCopy(pub_rel_cols), + TEXTOID, -1, false, 'i', + &elems, NULL, &nelems); + for (i = 0; i < nelems; i++) + columns = lappend(columns, TextDatumGetCString(elems[i])); + entry->att_map = get_table_columnset(publish_as_relid, columns, entry->att_map); + MemoryContextSwitchTo(oldctx); + } + ReleaseSysCache(pub_rel_tuple); + } entry->pubactions.pubinsert |= pub->pubactions.pubinsert; entry->pubactions.pubupdate |= pub->pubactions.pubupdate; entry->pubactions.pubdelete |= pub->pubactions.pubdelete; entry->pubactions.pubtruncate |= pub->pubactions.pubtruncate; } - if (entry->pubactions.pubinsert && entry->pubactions.pubupdate && - entry->pubactions.pubdelete && entry->pubactions.pubtruncate) - break; } list_free(pubids); @@ -1343,6 +1396,8 @@ rel_sync_cache_relation_cb(Datum arg, Oid relid) entry->schema_sent = false; list_free(entry->streamed_txns); entry->streamed_txns = NIL; + bms_free(entry->att_map); + entry->att_map = NULL; if (entry->map) { /* diff --git a/src/include/catalog/pg_publication.h b/src/include/catalog/pg_publication.h index 1ae439e6f3..d16e0218dc 100644 --- a/src/include/catalog/pg_publication.h +++ b/src/include/catalog/pg_publication.h @@ -86,6 +86,7 @@ typedef struct Publication typedef struct PublicationRelInfo { Relation relation; + List *columns; } PublicationRelInfo; extern Publication *GetPublication(Oid pubid); diff --git a/src/include/catalog/pg_publication_rel.h b/src/include/catalog/pg_publication_rel.h index b5d5504cbb..11695e8478 100644 --- a/src/include/catalog/pg_publication_rel.h +++ b/src/include/catalog/pg_publication_rel.h @@ -31,6 +31,9 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) Oid oid; /* oid */ Oid prpubid BKI_LOOKUP(pg_publication); /* Oid of the publication */ Oid prrelid BKI_LOOKUP(pg_class); /* Oid of the relation */ +#ifdef CATALOG_VARLEN + text prattrs[1]; /* Variable length field starts here */ +#endif } FormData_pg_publication_rel; /* ---------------- @@ -40,6 +43,7 @@ CATALOG(pg_publication_rel,6106,PublicationRelRelationId) */ typedef FormData_pg_publication_rel *Form_pg_publication_rel; +DECLARE_TOAST(pg_publication_rel, 8895, 8896); DECLARE_UNIQUE_INDEX_PKEY(pg_publication_rel_oid_index, 6112, PublicationRelObjectIndexId, on pg_publication_rel using btree(oid oid_ops)); DECLARE_UNIQUE_INDEX(pg_publication_rel_prrelid_prpubid_index, 6113, PublicationRelPrrelidPrpubidIndexId, on pg_publication_rel using btree(prrelid oid_ops, prpubid oid_ops)); diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h index 067138e6b5..f19e9508ba 100644 --- a/src/include/nodes/parsenodes.h +++ b/src/include/nodes/parsenodes.h @@ -3641,6 +3641,7 @@ typedef struct PublicationTable { NodeTag type; RangeVar *relation; /* relation to be published */ + List *columns; /* List of columns in a publication table */ } PublicationTable; /* diff --git a/src/include/replication/logicalproto.h b/src/include/replication/logicalproto.h index 83741dcf42..709b4be916 100644 --- a/src/include/replication/logicalproto.h +++ b/src/include/replication/logicalproto.h @@ -207,11 +207,11 @@ extern void logicalrep_write_origin(StringInfo out, const char *origin, extern char *logicalrep_read_origin(StringInfo in, XLogRecPtr *origin_lsn); extern void logicalrep_write_insert(StringInfo out, TransactionId xid, Relation rel, HeapTuple newtuple, - bool binary); + bool binary, Bitmapset *att_map); extern LogicalRepRelId logicalrep_read_insert(StringInfo in, LogicalRepTupleData *newtup); extern void logicalrep_write_update(StringInfo out, TransactionId xid, Relation rel, HeapTuple oldtuple, - HeapTuple newtuple, bool binary); + HeapTuple newtuple, bool binary, Bitmapset *att_map); extern LogicalRepRelId logicalrep_read_update(StringInfo in, bool *has_oldtuple, LogicalRepTupleData *oldtup, LogicalRepTupleData *newtup); @@ -228,7 +228,7 @@ extern List *logicalrep_read_truncate(StringInfo in, extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn, bool transactional, const char *prefix, Size sz, const char *message); extern void logicalrep_write_rel(StringInfo out, TransactionId xid, - Relation rel); + Relation rel, Bitmapset *att_map); extern LogicalRepRelation *logicalrep_read_rel(StringInfo in); extern void logicalrep_write_typ(StringInfo out, TransactionId xid, Oid typoid); diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h index 31281279cf..1a4eb490db 100644 --- a/src/include/utils/rel.h +++ b/src/include/utils/rel.h @@ -681,5 +681,6 @@ RelationGetSmgr(Relation rel) /* routines in utils/cache/relcache.c */ extern void RelationIncrementReferenceCount(Relation rel); extern void RelationDecrementReferenceCount(Relation rel); +extern Bitmapset *get_table_columnset(Oid relid, List *columns, Bitmapset *att_map); #endif /* REL_H */ -- 2.30.2