Hi Hou zj,  Shi-san, all

> In this function, it used the local column number(keycol) to match the
> remote
> column number(attkeys), I think it will cause problem if the column order
> between pub/sub doesn't match. Like:
>
> -------
> - pub
> CREATE TABLE test_replica_id_full (x int, y int);
> ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;
> CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full;
> - sub
> CREATE TABLE test_replica_id_full (z int, y int, x int);
> CREATE unique INDEX idx ON test_replica_id_full(z);
> CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION 'dbname=postgres
> port=5432' PUBLICATION tap_pub_rep_full;
> -------
>
> I think we need to use the attrmap->attnums to convert the column number
> before
> comparing. Just for reference, attach a diff(0001) which I noted down when
> trying to
> fix the problem.
>

I'm always afraid of these types of last minute additions to the patch, and
here we have
this issue on one of the latest addition :(

Thanks for reporting the problem and also providing guidance on the fix.
After reading
codes on attrMap and debugging this case further, I think your suggestion
makes sense.

I only made some small changes, and included them in the patch.


> Besides, I also look at the "WIP: Optimize for non-pkey / non-RI unique
> indexes" patch, I think it also had a similar problem about the column
> matching


Right, I'll incorporate this fix to that one as well.


> . And another thing I think we can improved in this WIP patch is that
> we can cache the result of IsIdxSafeToSkipDuplicates() instead of doing it
> for
> each UPDATE, because the cost of this function becomes bigger after
> applying
> this patch


Yes, it makes sense.


>
> Thanks for Shi-san for helping to finish these fixes.
>
> Thank you both!


Onder Kalaci
From 115999a134635ff6bef4caab434e6ece5d77c1b8 Mon Sep 17 00:00:00 2001
From: Onder Kalaci <onderkalaci@gmail.com>
Date: Wed, 22 Feb 2023 14:12:56 +0300
Subject: [PATCH v45] Allow the use of indexes other than PK and REPLICA
 IDENTITY on the subscriber.

Using REPLICA IDENTITY FULL on the publisher can lead to a full table
scan per tuple change on the subscription when REPLICA IDENTITY or PK
index is not available. This makes REPLICA IDENTITY FULL impractical
to use apart from some small number of use cases.

This patch allows using indexes other than PRIMARY KEY or REPLICA
IDENTITY on the subscriber during apply of update/delete. The index
that can be used must be a btree index, not a partial index, and it
must have at least one column reference (i.e. cannot consist of only
expressions). We can uplift these restrictions in the future. There is
no smart mechanism to pick the index. If there is more than one index
that satisfies these requirements, we just pick the first one. We discussed
using some of the optimizer's low-level APIs for this but ruled it out
as that can be a maintenance burden in the long run.

This patch improves the performance in the vast majority of cases and
the improvement is proportional to the amount of data in the table.
However, there could be some regression in a small number of cases
where the indexes have a lot of duplicate and dead rows. It was
discussed that those are mostly impractical cases but we can provide a
table or subscription level option to disable this feature if
required.

Author: Onder Kalaci
Reviewed-by: Peter Smith, Shi yu, Hou Zhijie, Vignesh C, Kuroda
Hayato, Amit Kapila
Discussion: https://postgr.es/m/CACawEhVLqmAAyPXdHEPv1ssU2c=dqOniiGz7G73HfyS7+nGV4w@mail.gmail.com
---
 doc/src/sgml/logical-replication.sgml         |   9 +-
 src/backend/executor/execReplication.c        | 111 ++--
 src/backend/replication/logical/relation.c    | 227 +++++++-
 src/backend/replication/logical/worker.c      |  56 +-
 src/include/replication/logicalrelation.h     |   5 +
 src/test/subscription/meson.build             |   1 +
 .../subscription/t/032_subscribe_use_index.pl | 514 ++++++++++++++++++
 7 files changed, 854 insertions(+), 69 deletions(-)
 create mode 100644 src/test/subscription/t/032_subscribe_use_index.pl

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..6b0e300adc 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -132,7 +132,14 @@
    certain additional requirements) can also be set to be the replica
    identity.  If the table does not have any suitable key, then it can be set
    to replica identity <quote>full</quote>, which means the entire row becomes
-   the key.  This, however, is very inefficient and should only be used as a
+   the key.  When replica identity <quote>full</quote> is specified,
+   indexes can be used on the subscriber side for searching the rows.  Candidate
+   indexes must be btree, non-partial, and have at least one column reference
+   (i.e. cannot consist of only expressions).  These restrictions
+   on the non-unique index properties adhere to some of the restrictions that
+   are enforced for primary keys.  If there are no such suitable indexes,
+   the search on the subscriber side can be very inefficient, therefore
+   replica identity <quote>full</quote> should only be used as a
    fallback if no other solution is possible.  If a replica identity other
    than <quote>full</quote> is set on the publisher side, a replica identity
    comprising the same or fewer columns must also be set on the subscriber
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index c484f5c301..cd17be0681 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -25,6 +25,7 @@
 #include "nodes/nodeFuncs.h"
 #include "parser/parse_relation.h"
 #include "parser/parsetree.h"
+#include "replication/logicalrelation.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
@@ -37,49 +38,63 @@
 #include "utils/typcache.h"
 
 
+static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
+						 TypeCacheEntry **eq);
+
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
  * is setup to match 'rel' (*NOT* idxrel!).
  *
- * Returns whether any column contains NULLs.
+ * Returns how many columns to use for the index scan.
+ *
+ * This is not generic routine, it expects the idxrel to be a btree, non-partial
+ * and have at least one column reference (i.e. cannot consist of only
+ * expressions).
  *
- * This is not generic routine, it expects the idxrel to be replication
- * identity of a rel and meet all limitations associated with that.
+ * By definition, replication identity of a rel meets all limitations associated
+ * with that. Note that any other index could also meet these limitations.
  */
-static bool
+static int
 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 						 TupleTableSlot *searchslot)
 {
-	int			attoff;
+	int			index_attoff;
+	int			skey_attoff = 0;
 	bool		isnull;
 	Datum		indclassDatum;
 	oidvector  *opclass;
 	int2vector *indkey = &idxrel->rd_index->indkey;
-	bool		hasnulls = false;
-
-	Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
-		   RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
 
 	indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
 									Anum_pg_index_indclass, &isnull);
 	Assert(!isnull);
 	opclass = (oidvector *) DatumGetPointer(indclassDatum);
 
-	/* Build scankey for every attribute in the index. */
-	for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
+	/* Build scankey for every non-expression attribute in the index. */
+	for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
+		 index_attoff++)
 	{
 		Oid			operator;
+		Oid			optype;
 		Oid			opfamily;
 		RegProcedure regop;
-		int			pkattno = attoff + 1;
-		int			mainattno = indkey->values[attoff];
-		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
+		int			table_attno = indkey->values[index_attoff];
+
+		if (!AttributeNumberIsValid(table_attno))
+		{
+			/*
+			 * XXX: Currently, we don't support expressions in the scan key,
+			 * see code below.
+			 */
+			continue;
+		}
 
 		/*
 		 * Load the operator info.  We need this to get the equality operator
 		 * function for the scan key.
 		 */
-		opfamily = get_opclass_family(opclass->values[attoff]);
+		optype = get_opclass_input_type(opclass->values[index_attoff]);
+		opfamily = get_opclass_family(opclass->values[index_attoff]);
 
 		operator = get_opfamily_member(opfamily, optype,
 									   optype,
@@ -91,23 +106,25 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		regop = get_opcode(operator);
 
 		/* Initialize the scankey. */
-		ScanKeyInit(&skey[attoff],
-					pkattno,
+		ScanKeyInit(&skey[skey_attoff],
+					index_attoff + 1,
 					BTEqualStrategyNumber,
 					regop,
-					searchslot->tts_values[mainattno - 1]);
+					searchslot->tts_values[table_attno - 1]);
 
-		skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+		skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
 
 		/* Check for null value. */
-		if (searchslot->tts_isnull[mainattno - 1])
-		{
-			hasnulls = true;
-			skey[attoff].sk_flags |= SK_ISNULL;
-		}
+		if (searchslot->tts_isnull[table_attno - 1])
+			skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
+
+		skey_attoff++;
 	}
 
-	return hasnulls;
+	/* There must always be at least one attribute for the index scan. */
+	Assert(skey_attoff > 0);
+
+	return skey_attoff;
 }
 
 /*
@@ -123,33 +140,57 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 							 TupleTableSlot *outslot)
 {
 	ScanKeyData skey[INDEX_MAX_KEYS];
+	int			skey_attoff;
 	IndexScanDesc scan;
 	SnapshotData snap;
 	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
+	TypeCacheEntry **eq = NULL;
+	bool		isIdxSafeToSkipDuplicates;
 
 	/* Open the index. */
 	idxrel = index_open(idxoid, RowExclusiveLock);
 
-	/* Start an index scan. */
+	isIdxSafeToSkipDuplicates = IsIdxSafeToSkipDuplicates(rel, idxoid);
+
 	InitDirtySnapshot(snap);
-	scan = index_beginscan(rel, idxrel, &snap,
-						   IndexRelationGetNumberOfKeyAttributes(idxrel),
-						   0);
 
 	/* Build scan key. */
-	build_replindex_scan_key(skey, rel, idxrel, searchslot);
+	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+	/* Start an index scan. */
+	scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
 
 retry:
 	found = false;
 
-	index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
+	index_rescan(scan, skey, skey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
-	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+	while (index_getnext_slot(scan, ForwardScanDirection, outslot))
 	{
-		found = true;
+		/*
+		 * Avoid expensive equality check if the index is primary key or
+		 * replica identity index.
+		 */
+		if (!isIdxSafeToSkipDuplicates)
+		{
+			if (eq == NULL)
+			{
+#ifdef USE_ASSERT_CHECKING
+				/* apply assertions only once for the input idxoid */
+				IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
+				Assert(IsIndexUsableForReplicaIdentityFull(indexInfo));
+#endif
+
+				eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
+			}
+
+			if (!tuples_equal(outslot, searchslot, eq))
+				continue;
+		}
+
 		ExecMaterializeSlot(outslot);
 
 		xwait = TransactionIdIsValid(snap.xmin) ?
@@ -164,6 +205,10 @@ retry:
 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 			goto retry;
 		}
+
+		/* Found our tuple and it's not locked */
+		found = true;
+		break;
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 55bfa07871..51f3b11694 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -17,8 +17,10 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/table.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_am_d.h"
 #include "catalog/pg_subscription_rel.h"
 #include "executor/executor.h"
 #include "nodes/makefuncs.h"
@@ -50,6 +52,10 @@ typedef struct LogicalRepPartMapEntry
 	LogicalRepRelMapEntry relmapentry;
 } LogicalRepPartMapEntry;
 
+static Oid	FindLogicalRepLocalIndex(Relation localrel,
+									 LogicalRepRelation *remoterel,
+									 AttrMap *attrMap);
+
 /*
  * Relcache invalidation callback for our relation map cache.
  */
@@ -439,6 +445,15 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		 */
 		logicalrep_rel_mark_updatable(entry);
 
+		/*
+		 * Finding a usable index is an infrequent task. It occurs when an
+		 * operation is first performed on the relation, or after invalidation
+		 * of the relation cache entry (such as ANALYZE or CREATE/DROP index
+		 * on the relation).
+		 */
+		entry->localindexoid = FindLogicalRepLocalIndex(entry->localrel, remoterel,
+														entry->attrmap);
+
 		entry->localrelvalid = true;
 	}
 
@@ -697,10 +712,218 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	/* Set if the table's replica identity is enough to apply update/delete. */
 	logicalrep_rel_mark_updatable(entry);
 
-	entry->localrelvalid = true;
-
 	/* state and statelsn are left set to 0. */
 	MemoryContextSwitchTo(oldctx);
 
+	/*
+	 * Finding a usable index is an infrequent task. It occurs when an
+	 * operation is first performed on the relation, or after invalidation of
+	 * the relation cache entry (such as ANALYZE or CREATE/DROP index on the
+	 * relation).
+	 *
+	 * We also prefer to run this code on the oldctx so that we do not leak
+	 * anything in the LogicalRepPartMapContext (hence CacheMemoryContext).
+	 */
+	entry->localindexoid = FindLogicalRepLocalIndex(partrel, remoterel,
+													entry->attrmap);
+
+	entry->localrelvalid = true;
+
 	return entry;
 }
+
+/*
+ * Returns true if the given index consists only of expressions such as:
+ * 	CREATE INDEX idx ON table(foo(col));
+ *
+ * Returns false even if there is one column reference:
+ * 	 CREATE INDEX idx ON table(foo(col), col_2);
+ */
+static bool
+IsIndexOnlyOnExpression(IndexInfo *indexInfo)
+{
+	for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		AttrNumber	attnum = indexInfo->ii_IndexAttrNumbers[i];
+
+		if (AttributeNumberIsValid(attnum))
+			return false;
+	}
+
+	return true;
+}
+
+
+/*
+ * Returns true if the attrmap (which belongs to remoterel) contains the
+ * leftmost column of the index.
+ *
+ * Otherwise returns false.
+ */
+static bool
+RemoteRelContainsLeftMostColumnOnIdx(IndexInfo  *indexInfo, AttrMap *attrmap)
+{
+	AttrNumber			keycol;
+
+	if (indexInfo->ii_NumIndexAttrs < 1)
+		return false;
+
+	keycol = indexInfo->ii_IndexAttrNumbers[0];
+	if (!AttributeNumberIsValid(keycol))
+		return false;
+
+	if (attrmap->maplen <= AttrNumberGetAttrOffset(keycol))
+		return false;
+
+	return attrmap->attnums[AttrNumberGetAttrOffset(keycol)] >= 0;
+}
+
+/*
+ * Returns the oid of an index that can be used by the apply worker to scan
+ * the relation. The index must be btree, non-partial, and have at least
+ * one column reference (i.e. cannot consist of only expressions). These
+ * limitations help to keep the index scan similar to PK/RI index scans.
+ *
+ * Note that the limitations of index scans for replica identity full only
+ * adheres to a subset of the limitations of PK/RI. For example, we support
+ * columns that are marked as [NULL] or we are not interested in the [NOT
+ * DEFERRABLE] aspect of constraints here. It works for us because we always
+ * compare the tuples for non-PK/RI index scans. See
+ * RelationFindReplTupleByIndex().
+ *
+ * XXX: There are no fundamental problems for supporting non-btree indexes.
+ * We mostly need to relax the limitations in RelationFindReplTupleByIndex().
+ * For partial indexes, the required changes are likely to be larger. If
+ * none of the tuples satisfy the expression for the index scan, we fall-back
+ * to sequential execution, which might not be a good idea in some cases.
+ *
+ * We also skip indexes if the remote relation does not contain the leftmost
+ * column of the index. This is because in most such cases sequential scan is
+ * favorable over index scan.
+ *
+ * We expect to call this function when REPLICA IDENTITY FULL is defined for
+ * the remote relation.
+ *
+ * If no suitable index is found, returns InvalidOid.
+ */
+static Oid
+FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap)
+{
+	List	   *idxlist = RelationGetIndexList(localrel);
+	ListCell   *lc;
+
+	foreach(lc, idxlist)
+	{
+		Oid			idxoid = lfirst_oid(lc);
+		bool		isUsableIdx;
+		bool 		containsLeftMostCol;
+		Relation	idxRel;
+		IndexInfo  *idxInfo;
+
+		idxRel = index_open(idxoid, AccessShareLock);
+		idxInfo = BuildIndexInfo(idxRel);
+		isUsableIdx = IsIndexUsableForReplicaIdentityFull(idxInfo);
+		containsLeftMostCol =
+			RemoteRelContainsLeftMostColumnOnIdx(idxInfo, attrmap);
+		index_close(idxRel, AccessShareLock);
+
+		/* Return the first eligible index found */
+		if (isUsableIdx && containsLeftMostCol)
+			return idxoid;
+	}
+
+	return InvalidOid;
+}
+
+/*
+ * Returns true if the index is usable for replica identity full. For details,
+ * see FindUsableIndexForReplicaIdentityFull.
+ */
+bool
+IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo)
+{
+	bool		is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
+	bool		is_partial = (indexInfo->ii_Predicate != NIL);
+	bool		is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
+
+	return is_btree && !is_partial && !is_only_on_expression;
+}
+
+/*
+ * Get replica identity index or if it is not defined a primary key.
+ *
+ * If neither is defined, returns InvalidOid
+ */
+Oid
+GetRelationIdentityOrPK(Relation rel)
+{
+	Oid			idxoid;
+
+	idxoid = RelationGetReplicaIndex(rel);
+
+	if (!OidIsValid(idxoid))
+		idxoid = RelationGetPrimaryKeyIndex(rel);
+
+	return idxoid;
+}
+
+/*
+ * Given a relation and OID of an index, returns true if the index is relation's
+ * replica identity index or relation's primary key's index.
+ *
+ * Returns false otherwise.
+ */
+bool
+IsIdxSafeToSkipDuplicates(Relation rel, Oid idxoid)
+{
+	Assert(OidIsValid(idxoid));
+
+	return GetRelationIdentityOrPK(rel) == idxoid;
+}
+
+/*
+ * Returns the index oid if we can use an index for subscriber. Otherwise,
+ * returns InvalidOid.
+ */
+static Oid
+FindLogicalRepLocalIndex(Relation localrel, LogicalRepRelation *remoterel,
+						 AttrMap *attrMap)
+{
+	Oid			idxoid;
+
+	/*
+	 * We never need index oid for partitioned tables, always rely on leaf
+	 * partition's index.
+	 */
+	if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return InvalidOid;
+
+	/*
+	 * Simple case, we already have a primary key or a replica identity index.
+	 */
+	idxoid = GetRelationIdentityOrPK(localrel);
+	if (OidIsValid(idxoid))
+		return idxoid;
+
+	if (remoterel->replident == REPLICA_IDENTITY_FULL)
+	{
+		/*
+		 * We are looking for one more opportunity for using an index. If
+		 * there are any indexes defined on the local relation, try to pick a
+		 * suitable index.
+		 *
+		 * The index selection safely assumes that all the columns are going
+		 * to be available for the index scan given that remote relation has
+		 * replica identity full.
+		 *
+		 * Note that we are not using the planner to find the cheapest method
+		 * to scan the relation as that would require us to either use lower
+		 * level planner functions which would be a maintenance burden in the
+		 * long run or use the full-fledged planner which could cause
+		 * overhead.
+		 */
+		return FindUsableIndexForReplicaIdentityFull(localrel, attrMap);
+	}
+
+	return InvalidOid;
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index c7d1734a17..10f9711972 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -400,12 +400,15 @@ static void apply_handle_insert_internal(ApplyExecutionData *edata,
 static void apply_handle_update_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot,
-										 LogicalRepTupleData *newtup);
+										 LogicalRepTupleData *newtup,
+										 Oid localindexoid);
 static void apply_handle_delete_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
-										 TupleTableSlot *remoteslot);
+										 TupleTableSlot *remoteslot,
+										 Oid localindexoid);
 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
 									LogicalRepRelation *remoterel,
+									Oid localidxoid,
 									TupleTableSlot *remoteslot,
 									TupleTableSlot **localslot);
 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
@@ -2350,24 +2353,6 @@ apply_handle_type(StringInfo s)
 	logicalrep_read_typ(s, &typ);
 }
 
-/*
- * Get replica identity index or if it is not defined a primary key.
- *
- * If neither is defined, returns InvalidOid
- */
-static Oid
-GetRelationIdentityOrPK(Relation rel)
-{
-	Oid			idxoid;
-
-	idxoid = RelationGetReplicaIndex(rel);
-
-	if (!OidIsValid(idxoid))
-		idxoid = RelationGetPrimaryKeyIndex(rel);
-
-	return idxoid;
-}
-
 /*
  * Check that we (the subscription owner) have sufficient privileges on the
  * target relation to perform the given operation.
@@ -2627,7 +2612,7 @@ apply_handle_update(StringInfo s)
 								   remoteslot, &newtup, CMD_UPDATE);
 	else
 		apply_handle_update_internal(edata, edata->targetRelInfo,
-									 remoteslot, &newtup);
+									 remoteslot, &newtup, rel->localindexoid);
 
 	finish_edata(edata);
 
@@ -2648,7 +2633,8 @@ static void
 apply_handle_update_internal(ApplyExecutionData *edata,
 							 ResultRelInfo *relinfo,
 							 TupleTableSlot *remoteslot,
-							 LogicalRepTupleData *newtup)
+							 LogicalRepTupleData *newtup,
+							 Oid localindexoid)
 {
 	EState	   *estate = edata->estate;
 	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
@@ -2663,6 +2649,7 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 
 	found = FindReplTupleInLocalRel(estate, localrel,
 									&relmapentry->remoterel,
+									localindexoid,
 									remoteslot, &localslot);
 	ExecClearTuple(remoteslot);
 
@@ -2767,7 +2754,7 @@ apply_handle_delete(StringInfo s)
 								   remoteslot, NULL, CMD_DELETE);
 	else
 		apply_handle_delete_internal(edata, edata->targetRelInfo,
-									 remoteslot);
+									 remoteslot, rel->localindexoid);
 
 	finish_edata(edata);
 
@@ -2787,7 +2774,8 @@ apply_handle_delete(StringInfo s)
 static void
 apply_handle_delete_internal(ApplyExecutionData *edata,
 							 ResultRelInfo *relinfo,
-							 TupleTableSlot *remoteslot)
+							 TupleTableSlot *remoteslot,
+							 Oid localindexoid)
 {
 	EState	   *estate = edata->estate;
 	Relation	localrel = relinfo->ri_RelationDesc;
@@ -2799,7 +2787,7 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
-	found = FindReplTupleInLocalRel(estate, localrel, remoterel,
+	found = FindReplTupleInLocalRel(estate, localrel, remoterel, localindexoid,
 									remoteslot, &localslot);
 
 	/* If found delete it. */
@@ -2833,17 +2821,17 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 /*
  * Try to find a tuple received from the publication side (in 'remoteslot') in
  * the corresponding local relation using either replica identity index,
- * primary key or if needed, sequential scan.
+ * primary key, index or if needed, sequential scan.
  *
  * Local tuple, if found, is returned in '*localslot'.
  */
 static bool
 FindReplTupleInLocalRel(EState *estate, Relation localrel,
 						LogicalRepRelation *remoterel,
+						Oid localidxoid,
 						TupleTableSlot *remoteslot,
 						TupleTableSlot **localslot)
 {
-	Oid			idxoid;
 	bool		found;
 
 	/*
@@ -2854,12 +2842,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
 
 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
 
-	idxoid = GetRelationIdentityOrPK(localrel);
-	Assert(OidIsValid(idxoid) ||
+	Assert(OidIsValid(localidxoid) ||
 		   (remoterel->replident == REPLICA_IDENTITY_FULL));
 
-	if (OidIsValid(idxoid))
-		found = RelationFindReplTupleByIndex(localrel, idxoid,
+	if (OidIsValid(localidxoid))
+		found = RelationFindReplTupleByIndex(localrel, localidxoid,
 											 LockTupleExclusive,
 											 remoteslot, *localslot);
 	else
@@ -2960,7 +2947,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 
 		case CMD_DELETE:
 			apply_handle_delete_internal(edata, partrelinfo,
-										 remoteslot_part);
+										 remoteslot_part,
+										 part_entry->localindexoid);
 			break;
 
 		case CMD_UPDATE:
@@ -2980,6 +2968,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				/* Get the matching local tuple from the partition. */
 				found = FindReplTupleInLocalRel(estate, partrel,
 												&part_entry->remoterel,
+												part_entry->localindexoid,
 												remoteslot_part, &localslot);
 				if (!found)
 				{
@@ -3076,7 +3065,8 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 
 					/* DELETE old tuple found in the old partition. */
 					apply_handle_delete_internal(edata, partrelinfo,
-												 localslot);
+												 localslot,
+												 part_entry->localindexoid);
 
 					/* INSERT new tuple into the new partition. */
 
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 9c34054bb7..e9fc368cdb 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -13,6 +13,7 @@
 #define LOGICALRELATION_H
 
 #include "access/attmap.h"
+#include "catalog/index.h"
 #include "replication/logicalproto.h"
 
 typedef struct LogicalRepRelMapEntry
@@ -31,6 +32,7 @@ typedef struct LogicalRepRelMapEntry
 	Relation	localrel;		/* relcache entry (NULL when closed) */
 	AttrMap    *attrmap;		/* map of local attributes to remote ones */
 	bool		updatable;		/* Can apply updates/deletes? */
+	Oid			localindexoid;  /* which index to use, or InvalidOid if none */
 
 	/* Sync state. */
 	char		state;
@@ -46,5 +48,8 @@ extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *r
 														Relation partrel, AttrMap *map);
 extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
+extern bool IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo);
+extern Oid GetRelationIdentityOrPK(Relation rel);
+extern bool IsIdxSafeToSkipDuplicates(Relation rel, Oid idxoid);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 3db0fdfd96..f85bf92b6f 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -38,6 +38,7 @@ tests += {
       't/029_on_error.pl',
       't/030_origin.pl',
       't/031_column_list.pl',
+      't/032_subscribe_use_index.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/032_subscribe_use_index.pl b/src/test/subscription/t/032_subscribe_use_index.pl
new file mode 100644
index 0000000000..71e5be93ff
--- /dev/null
+++ b/src/test/subscription/t/032_subscribe_use_index.pl
@@ -0,0 +1,514 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+# Test logical replication behavior with subscriber uses available index
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	"wal_retrieve_retry_interval = 1ms");
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname           = 'tap_sub';
+my $result            = '';
+
+# =============================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH MULTIPLE ROWS AND COLUMNS
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-9 for x and y
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,30)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# delete 6 rows
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x IN (5, 6);");
+
+# update 6 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = 100, y = '200' WHERE x IN (1, 2);");
+
+# wait until the index is used on the subscriber
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 12) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 20 rows via index";
+
+# subscriber gets the missing table information
+$node_subscriber->safe_psql('postgres',
+	"ALTER SUBSCRIPTION tap_sub_rep_full REFRESH PUBLICATION");
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# delete 6 rows
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x IN (7, 8);");
+
+# update 6 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = 101, y = '201' WHERE x IN (3, 4);");
+
+# wait until the index is used on the subscriber
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 24) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 20 rows via index";
+
+# make sure that the subscriber has the correct data after the update UPDATE
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full WHERE (x = 100 and y = '200')");
+is($result, qq(6), 'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data after the first DELETE
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x in (5, 6);");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data after the second UPDATE
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full WHERE x = 101 and y = '201'");
+is($result, qq(6), 'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data after the second DELETE
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x in (7, 8);");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH MULTIPLE ROWS AND COLUMNS
+# =============================================================================
+
+# =============================================================================
+# Testcase start: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_part_idx ON users_table_part(user_id, value_1)"
+);
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%100), (i%20), i FROM generate_series(0,100)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# update rows, moving them to other partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE user_id = 4;");
+
+# delete rows from different partitions
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 1 and value_1 = 1;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 12 and value_1 = 12;");
+
+# wait until the index is used on the subscriber
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=3 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(user_id+value_1+value_2) from users_table_part;");
+is($result, qq(10907), 'ensure subscriber has the correct data at the end of the test');
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(DISTINCT(user_id,value_1, value_2)) from users_table_part;");
+is($result, qq(99), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# Testcase end: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+# =============================================================================
+
+# =============================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS OR
+# PARTIAL INDEX
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+
+# index with only an expression
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names_expr_only ON people ((firstname || ' ' || lastname));");
+
+# partial index
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names_partial ON people(firstname) WHERE (firstname = 'first_name_1');");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0,200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'no-name' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'no-name' WHERE firstname = 'first_name_2' AND lastname = 'last_name_2';");
+
+# make sure none of the indexes is used on the subscriber
+$node_publisher->wait_for_catchup($appname);
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(idx_scan) from pg_stat_all_indexes where indexrelname IN ('people_names_expr_only', 'people_names_partial')");
+is($result, qq(0), 'ensure subscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions');
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_3';");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_4' AND lastname = 'last_name_4';");
+
+# make sure the index is not used on the subscriber
+$node_publisher->wait_for_catchup($appname);
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(idx_scan) from pg_stat_all_indexes where indexrelname IN ('people_names_expr_only', 'people_names_partial')");
+is($result, qq(0), 'ensure subscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people;");
+is($result, qq(199), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS OR
+# PARTIAL INDEX
+# =============================================================================
+
+# =============================================================================
+# Testcase start: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS,
+# ALSO DROPPING/RECREATING INDEXES WORKS JUST FINE
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people (firstname, lastname, (firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0, 20)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# update 1 row
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'no-name' WHERE firstname = 'first_name_1';");
+
+# delete the updated row
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'no-name';");
+
+# wait until the index is used on the subscriber
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes two rows via index scan with index on expressions and columns";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people;");
+is($result, qq(20), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people WHERE firstname = 'no-name';");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# now, drop the index with the expression, and re-create index on column lastname
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX people_names");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_last_names ON people(lastname)");
+
+# delete 1 row
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE lastname = 'last_name_18';");
+
+# wait until the index is used on the subscriber
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'people_last_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes 1 row via index scan on people_last_names";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people WHERE lastname = 'last_name_18';");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS,
+# ALSO DROPPING/RECREATING INDEXES WORKS JUST FINE
+# =============================================================================
+
+# =============================================================================
+# Testcase start: SOME NULL VALUES AND MISSING COLUMN
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y);"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# load some data, and update 2 tuples
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full VALUES (1), (2), (3);");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 1;");
+
+# check if the index is used even when the index has NULL values
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates test_replica_id_full table";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(x) from test_replica_id_full WHERE y IS NULL;");
+is($result, qq(7), 'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full WHERE y IS NULL;");
+is($result, qq(3), 'ensure subscriber has the correct data at the end of the test');
+
+# drop the old index and create a new one on the other column
+$result = $node_subscriber->safe_psql('postgres',
+	"DROP INDEX test_replica_id_full_idx;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idy ON test_replica_id_full(y)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"TRUNCATE test_replica_id_full;");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# now, we update only 1 row on the publisher and expect
+# the subscriber to only update 1 row via sequential scan
+# as the leftmost column of the index does not exist on the pub
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+
+$node_publisher->wait_for_catchup($appname);
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT sum(x) FROM test_replica_id_full");
+is($result, qq(232), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SOME NULL VALUES AND MISSING COLUMN
+# =============================================================================
+
+# =============================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH PUB/SUB DIFFERENT DATA VIA
+# A UNIQUE INDEX THAT IS NOT PRIMARY KEY OR REPLICA IDENTITY
+#
+# The subscriber has duplicate tuples that publisher does not have.
+# When publsher updates/deletes 1 row, subscriber uses indexes and
+# updates/deletes exactly 1 row.
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE UNIQUE INDEX test_replica_id_full_idxy ON test_replica_id_full(x,y)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i,i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# duplicate the data in subscriber for y column
+$node_subscriber->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i+100, i FROM generate_series(0,21)i;");
+
+# now, we update only 1 row on the publisher and expect
+# the subscriber to only update 1 row although there are
+# two tuples with y = 15 on the subscriber
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = 2000 WHERE y = 15;");
+
+# wait until the index is used on the subscriber
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idxy';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates one row via index";
+
+# make sure that the subscriber has the correct data
+# we only updated 1 row
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full WHERE x = 2000");
+is($result, qq(1), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH PUB/SUB DIFFERENT DATA VIA
+# A UNIQUE INDEX THAT IS NOT PRIMARY KEY OR REPLICA IDENTITY
+# =============================================================================
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.34.1

Reply via email to