This is an automated email from the ASF dual-hosted git repository.

reshke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/cloudberry.git

commit a790f4cbdf0f9aeff7d14c6e6d171f25d6b4c20e
Author: Amit Kapila <[email protected]>
AuthorDate: Tue Jun 21 07:52:41 2022 +0530

    Fix partition table's REPLICA IDENTITY checking on the subscriber.
    
    In logical replication, we will check if the target table on the
    subscriber is updatable by comparing the replica identity of the table on
    the publisher with the table on the subscriber. When the target table is a
    partitioned table, we only check its replica identity but not for the
    partition tables. This leads to assertion failure while applying changes
    for update/delete as we expect those to succeed only when the
    corresponding partition table has a primary key or has a replica
    identity defined.
    
    Fix it by checking the replica identity of the partition table while
    applying changes.
    
    Reported-by: Shi Yu
    Author: Shi Yu, Hou Zhijie
    Reviewed-by: Amit Langote, Amit Kapila
    Backpatch-through: 13, where it was introduced
    Discussion: 
https://postgr.es/m/oszpr01mb6310f46cd425a967e4aef736fd...@oszpr01mb6310.jpnprd01.prod.outlook.com
---
 src/backend/replication/logical/relation.c | 115 +++++++++++++++++------------
 src/backend/replication/logical/worker.c   |  27 +++++--
 src/test/subscription/t/013_partition.pl   |  16 +++-
 3 files changed, 102 insertions(+), 56 deletions(-)

diff --git a/src/backend/replication/logical/relation.c 
b/src/backend/replication/logical/relation.c
index a9fa26fe686..f1cff93b920 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -249,6 +249,67 @@ logicalrep_report_missing_attrs(LogicalRepRelation 
*remoterel,
        }
 }
 
+/*
+ * Check if replica identity matches and mark the updatable flag.
+ *
+ * We allow for stricter replica identity (fewer columns) on subscriber as
+ * that will not stop us from finding unique tuple. IE, if publisher has
+ * identity (id,timestamp) and subscriber just (id) this will not be a
+ * problem, but in the opposite scenario it will.
+ *
+ * We just mark the relation entry as not updatable here if the local
+ * replica identity is found to be insufficient for applying
+ * updates/deletes (inserts don't care!) and leave it to
+ * check_relation_updatable() to throw the actual error if needed.
+ */
+static void
+logicalrep_rel_mark_updatable(LogicalRepRelMapEntry *entry)
+{
+       Bitmapset  *idkey;
+       LogicalRepRelation *remoterel = &entry->remoterel;
+       int                     i;
+
+       entry->updatable = true;
+
+       idkey = RelationGetIndexAttrBitmap(entry->localrel,
+                                                                          
INDEX_ATTR_BITMAP_IDENTITY_KEY);
+       /* fallback to PK if no replica identity */
+       if (idkey == NULL)
+       {
+               idkey = RelationGetIndexAttrBitmap(entry->localrel,
+                                                                               
   INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+               /*
+                * If no replica identity index and no PK, the published table 
must
+                * have replica identity FULL.
+                */
+               if (idkey == NULL && remoterel->replident != 
REPLICA_IDENTITY_FULL)
+                       entry->updatable = false;
+       }
+
+       i = -1;
+       while ((i = bms_next_member(idkey, i)) >= 0)
+       {
+               int                     attnum = i + 
FirstLowInvalidHeapAttributeNumber;
+
+               if (!AttrNumberIsForUserDefinedAttr(attnum))
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("logical replication target 
relation \"%s.%s\" uses "
+                                                       "system columns in 
REPLICA IDENTITY index",
+                                                       remoterel->nspname, 
remoterel->relname)));
+
+               attnum = AttrNumberGetAttrOffset(attnum);
+
+               if (entry->attrmap->attnums[attnum] < 0 ||
+                       !bms_is_member(entry->attrmap->attnums[attnum], 
remoterel->attkeys))
+               {
+                       entry->updatable = false;
+                       break;
+               }
+       }
+}
+
 /*
  * Open the local relation associated with the remote one.
  *
@@ -307,7 +368,6 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE 
lockmode)
        if (!entry->localrelvalid)
        {
                Oid                     relid;
-               Bitmapset  *idkey;
                TupleDesc       desc;
                MemoryContext oldctx;
                int                     i;
@@ -373,54 +433,10 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE 
lockmode)
                bms_free(missingatts);
 
                /*
-                * Check that replica identity matches. We allow for stricter 
replica
-                * identity (fewer columns) on subscriber as that will not stop 
us
-                * from finding unique tuple. IE, if publisher has identity
-                * (id,timestamp) and subscriber just (id) this will not be a 
problem,
-                * but in the opposite scenario it will.
-                *
-                * Don't throw any error here just mark the relation entry as 
not
-                * updatable, as replica identity is only for updates and 
deletes but
-                * inserts can be replicated even without it.
+                * Set if the table's replica identity is enough to apply
+                * update/delete.
                 */
-               entry->updatable = true;
-               idkey = RelationGetIndexAttrBitmap(entry->localrel,
-                                                                               
   INDEX_ATTR_BITMAP_IDENTITY_KEY);
-               /* fallback to PK if no replica identity */
-               if (idkey == NULL)
-               {
-                       idkey = RelationGetIndexAttrBitmap(entry->localrel,
-                                                                               
           INDEX_ATTR_BITMAP_PRIMARY_KEY);
-
-                       /*
-                        * If no replica identity index and no PK, the 
published table
-                        * must have replica identity FULL.
-                        */
-                       if (idkey == NULL && remoterel->replident != 
REPLICA_IDENTITY_FULL)
-                               entry->updatable = false;
-               }
-
-               i = -1;
-               while ((i = bms_next_member(idkey, i)) >= 0)
-               {
-                       int                     attnum = i + 
FirstLowInvalidHeapAttributeNumber;
-
-                       if (!AttrNumberIsForUserDefinedAttr(attnum))
-                               ereport(ERROR,
-                                               
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                                                errmsg("logical replication 
target relation \"%s.%s\" uses "
-                                                               "system columns 
in REPLICA IDENTITY index",
-                                                               
remoterel->nspname, remoterel->relname)));
-
-                       attnum = AttrNumberGetAttrOffset(attnum);
-
-                       if (entry->attrmap->attnums[attnum] < 0 ||
-                               !bms_is_member(entry->attrmap->attnums[attnum], 
remoterel->attkeys))
-                       {
-                               entry->updatable = false;
-                               break;
-                       }
-               }
+               logicalrep_rel_mark_updatable(entry);
 
                entry->localrelvalid = true;
        }
@@ -658,7 +674,8 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
                           attrmap->maplen * sizeof(AttrNumber));
        }
 
-       entry->updatable = root->updatable;
+       /* Set if the table's replica identity is enough to apply 
update/delete. */
+       logicalrep_rel_mark_updatable(entry);
 
        entry->localrelvalid = true;
 
diff --git a/src/backend/replication/logical/worker.c 
b/src/backend/replication/logical/worker.c
index 5335e5f5c62..7190dd94ebf 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1323,6 +1323,13 @@ apply_handle_insert_internal(ApplyExecutionData *edata,
 static void
 check_relation_updatable(LogicalRepRelMapEntry *rel)
 {
+       /*
+        * For partitioned tables, we only need to care if the target partition 
is
+        * updatable (aka has PK or RI defined for it).
+        */
+       if (rel->localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+               return;
+
        /* Updatable, no error. */
        if (rel->updatable)
                return;
@@ -1676,6 +1683,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
        TupleTableSlot *remoteslot_part;
        TupleConversionMap *map;
        MemoryContext oldctx;
+       LogicalRepRelMapEntry *part_entry = NULL;
+       AttrMap    *attrmap = NULL;
 
        /* ModifyTableState is needed for ExecFindPartition(). */
        edata->mtstate = mtstate = makeNode(ModifyTableState);
@@ -1707,8 +1716,11 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                remoteslot_part = table_slot_create(partrel, 
&estate->es_tupleTable);
        map = partrelinfo->ri_RootToPartitionMap;
        if (map != NULL)
-               remoteslot_part = execute_attr_map_slot(map->attrMap, 
remoteslot,
+       {
+               attrmap = map->attrMap;
+               remoteslot_part = execute_attr_map_slot(attrmap, remoteslot,
                                                                                
                remoteslot_part);
+       }
        else
        {
                remoteslot_part = ExecCopySlot(remoteslot_part, remoteslot);
@@ -1716,6 +1728,14 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
        }
        MemoryContextSwitchTo(oldctx);
 
+       /* Check if we can do the update or delete on the leaf partition. */
+       if (operation == CMD_UPDATE || operation == CMD_DELETE)
+       {
+               part_entry = logicalrep_partition_open(relmapentry, partrel,
+                                                                               
           attrmap);
+               check_relation_updatable(part_entry);
+       }
+
        switch (operation)
        {
                case CMD_INSERT:
@@ -1737,15 +1757,10 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
                         * suitable partition.
                         */
                        {
-                               AttrMap    *attrmap = map ? map->attrMap : NULL;
-                               LogicalRepRelMapEntry *part_entry;
                                TupleTableSlot *localslot;
                                ResultRelInfo *partrelinfo_new;
                                bool            found;
 
-                               part_entry = 
logicalrep_partition_open(relmapentry, partrel,
-                                                                               
                           attrmap);
-
                                /* Get the matching local tuple from the 
partition. */
                                found = FindReplTupleInLocalRel(estate, partrel,
                                                                                
                &part_entry->remoterel,
diff --git a/src/test/subscription/t/013_partition.pl 
b/src/test/subscription/t/013_partition.pl
index 568e4d104e0..dfe2cb6deae 100644
--- a/src/test/subscription/t/013_partition.pl
+++ b/src/test/subscription/t/013_partition.pl
@@ -6,7 +6,7 @@ use strict;
 use warnings;
 use PostgresNode;
 use TestLib;
-use Test::More tests => 70;
+use Test::More tests => 71;
 
 # setup
 
@@ -856,3 +856,17 @@ $node_publisher->wait_for_catchup('sub2');
 $result = $node_subscriber2->safe_psql('postgres',
        "SELECT a, b, c FROM tab5 ORDER BY 1");
 is($result, qq(3||1), 'updates of tab5 replicated correctly after altering 
table on publisher');
+
+# Test that replication works correctly as long as the leaf partition
+# has the necessary REPLICA IDENTITY, even though the actual target
+# partitioned table does not.
+$node_subscriber2->safe_psql('postgres',
+       "ALTER TABLE tab5 REPLICA IDENTITY NOTHING");
+
+$node_publisher->safe_psql('postgres', "UPDATE tab5 SET a = 4 WHERE a = 3");
+
+$node_publisher->wait_for_catchup('sub2');
+
+$result = $node_subscriber2->safe_psql('postgres',
+       "SELECT a, b, c FROM tab5_1 ORDER BY 1");
+is($result, qq(4||1), 'updates of tab5 replicated correctly');


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to