Hi Amit, all

> Few comments on 0001
> ====================
> 1.
> +   such suitable indexes, the search on the subscriber s ide can be
> very inefficient,
>
> unnecessary space in 'side'
>

Fixed in v28


>
> 2.
> -   identity.  If the table does not have any suitable key, then it can be
> set
> -   to replica identity <quote>full</quote>, which means the entire row
> becomes
> -   the key.  This, however, is very inefficient and should only be used
> as a
> -   fallback if no other solution is possible.  If a replica identity other
> +   identity. When replica identity <literal>FULL</literal> is specified,
> +   indexes can be used on the subscriber side for searching the rows.
>
> I think it is better to retain the first sentence (If the table does
> not ... entire row becomes the key.) as that says what will be part of
> the key.
>
>
right, that sentence looks useful, added back.


> 3.
> -   comprising the same or fewer columns must also be set on the subscriber
> -   side.  See <xref linkend="sql-altertable-replica-identity"/> for
> details on
> -   how to set the replica identity.  If a table without a replica
> identity is
> -   added to a publication that replicates <command>UPDATE</command>
> +   comprising the same or fewer columns must also be set on the
> subscriber side.
> +   See <xref linkend="sql-altertable-replica-identity"/> for
> +   details on how to set the replica identity.  If a table without a
> replica
> +   identity is added to a publication that replicates
> <command>UPDATE</command>
>
> I don't see any change in this except line length. If so, I don't
> think we should change it as part of this patch.
>
>
Yes, fixed. But the first line (starting with See <xref ..) still shows as
if it is changed,
probably because its line has changed.  I couldn't make that line show as
it had not
changed.


> 4.
>  /*
>   * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key'
> that
>   * is setup to match 'rel' (*NOT* idxrel!).
>   *
> - * Returns whether any column contains NULLs.
> + * Returns how many columns should be used for the index scan.
> + *
> + * This is not generic routine, it expects the idxrel to be
> + * a btree, non-partial and have at least one column
> + * reference (e.g., should not consist of only expressions).
>   *
> - * This is not generic routine, it expects the idxrel to be replication
> - * identity of a rel and meet all limitations associated with that.
> + * By definition, replication identity of a rel meets all
> + * limitations associated with that. Note that any other
> + * index could also meet these limitations.
>
> The comment changes look quite asymmetric to me. Normally, we break
> the line if the line length goes beyond 80 cols. Please check and
> change other places in the patch if they have a similar symptom.
>

Went over the patch, and expanded each line ~80 chars.

I'm guessing 80 is not the hard limit, in some cases I went over 81-82.


>
> 5.
> + * There are no fundamental problems for supporting non-btree
> + * and/or partial indexes.
>
> Can we mention partial indexes in the above comment? It seems to me
> that because the required tuple may not satisfy the expression (in the
> case of partial indexes) it may not be easy to support it.
>

Expanded the comment and explained the differences a little further.


>
> 6.
> build_replindex_scan_key()
> {
> ...
> + for (index_attoff = 0; index_attoff <
> IndexRelationGetNumberOfKeyAttributes(idxrel);
> + index_attoff++)
> ...
> ...
> +#ifdef USE_ASSERT_CHECKING
> + IndexInfo  *indexInfo = BuildIndexInfo(idxrel);
> +
> + Assert(!IsIndexOnlyOnExpression(indexInfo));
> +#endif
> ...
> }
>
> We can avoid building index info multiple times. This can be either
> checked at the beginning of the function outside attribute offset loop
> or we can probably cache it. I understand this is for assert builds
> but seems easy to avoid it doing multiple times and it also looks odd
> to do it multiple times for the same index.
>

Applied your suggestions. Although I do not have strong opinions, I think
that
it was easier to follow with building the indexInfo for each iteration.


>
> 7.
> - /* Build scankey for every attribute in the index. */
> - for (attoff = 0; attoff <
> IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
> + /* Build scankey for every non-expression attribute in the index. */
> + for (index_attoff = 0; index_attoff <
> IndexRelationGetNumberOfKeyAttributes(idxrel);
> + index_attoff++)
>   {
>   Oid operator;
>   Oid opfamily;
> + Oid optype = get_opclass_input_type(opclass->values[index_attoff]);
>   RegProcedure regop;
> - int pkattno = attoff + 1;
> - int mainattno = indkey->values[attoff];
> - Oid optype = get_opclass_input_type(opclass->values[attoff]);
> + int table_attno = indkey->values[index_attoff];
>
> I don't think here we need to change variable names if we retain
> mainattno as it is instead of changing it to table_attno. The current
> naming doesn't seem bad for the current usage of the patch.
>

Hmm, I'm actually not convinced that the variable naming on HEAD is good for
the current patch. The main difference is that now we allow indexes like:
   * CREATE INDEX idx ON table(foo(col), col_2)*

(See # Testcase start: SUBSCRIPTION CAN USE INDEXES WITH
EXPRESSIONS AND COLUMNS)

In such indexes, we could skip the attributes of the index. So, skey_attoff
is not
equal to  index_attoff anymore. So, calling out this explicitly via the
variable names
seems more robust to me. Plus, mainattno sounded vague to me when I first
read
this function.

So, unless you have strong objections, I'm leaning towards having variable
names more explicit. I'm also open for suggestions if you think the names
I picked is not clear enough.


>
> 8.
> + TypeCacheEntry **eq = NULL; /* only used when the index is not RI or PK
> */
>
> Normally, we don't add such comments as the usage is quite obvious by
> looking at the code.
>
>
Sure, I also don't see much value for it, removed.

Attached v29 for this review. Note that I'll be working on the disable
index scan changes after
I reply to some of the other pending reviews.

Thanks,
Onder
From 5fa22096985ddc4c1c4804554da595db1b70d057 Mon Sep 17 00:00:00 2001
From: Onder Kalaci <onderkalaci@gmail.com>
Date: Wed, 22 Feb 2023 14:12:56 +0300
Subject: [PATCH v102] Use indexes on the subscriber when REPLICA IDENTITY is
 full on the publisher

Using `REPLICA IDENTITY FULL` on the publication leads to a full table
scan per tuple change on the subscription. This makes `REPLICA
IDENTITY FULL` impracticable -- probably other than some small number
of use cases.

With this patch, I'm proposing the following change: If there is any
index on the subscriber, let the apply worker use an index. The index
should be a btree index, not a partial index, and it should have at
least one column reference (e.g., cannot consist of only expressions).

There is no smart mechanism to pick the index. If there is more than
one index that satisfies these requirements, we just pick the first
one.

The majority of the logic on the subscriber side already exists in
the code. The subscriber is already capable of doing (unique) index
scans.  With this patch, we are allowing the index to iterate over the
tuples fetched and only act when tuples are equal. Anyone familiar
with this part of the code might recognize that the sequential scan
code on the subscriber already implements the `tuples_equal()`
function. In short, the changes on the subscriber are mostly
combining parts of (unique) index scan and sequential scan codes.

From the performance point of view, there are a few things to note.
First, the patch aims not to change the behavior when PRIMARY KEY
or UNIQUE INDEX is used. Second, when REPLICA IDENTITY FULL is on
the publisher and an index is used on the subscriber, the
difference mostly comes down to `index scan` vs `sequential scan`.
That's why it is hard to claim certain number of improvements.
It mostly depends on the data size, index and the data distribution.

Still, below I try to showcase the potential improvements using an
index on the subscriber `pgbench_accounts(bid)`. With the index,
all the changes are replicated within ~5 seconds. When the index
is dropped, the same operation takes around ~300 seconds.

// init source db
pgbench -i -s 100 -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 5432 postgres
psql -c "CREATE INDEX i1 ON pgbench_accounts(aid);" -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL;" -p 5432 postgres
psql -c "CREATE PUBLICATION pub_test_1 FOR TABLE pgbench_accounts;" -p 5432 postgres

// init target db, drop existing primary key
pgbench -i -p 9700 postgres
psql -c "TRUNCATE pgbench_accounts;" -p 9700 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 9700 postgres
psql -c "CREATE SUBSCRIPTION sub_test_1 CONNECTION 'host=localhost port=5432 user=onderkalaci dbname=postgres' PUBLICATION pub_test_1;" -p 9700 postgres

// create one index, even on a low cardinality column
psql -c "CREATE INDEX i2 ON pgbench_accounts(bid);" -p 9700 postgres

// now, run some pgbench tests and observe replication
pgbench -t 500 -b tpcb-like -p 5432 postgres
---
 doc/src/sgml/logical-replication.sgml         |  22 +-
 src/backend/executor/execReplication.c        | 134 ++-
 src/backend/replication/logical/relation.c    | 184 ++++
 src/backend/replication/logical/worker.c      |  68 +-
 src/include/replication/logicalrelation.h     |   5 +
 src/test/subscription/meson.build             |   1 +
 .../subscription/t/032_subscribe_use_index.pl | 936 ++++++++++++++++++
 7 files changed, 1285 insertions(+), 65 deletions(-)
 create mode 100644 src/test/subscription/t/032_subscribe_use_index.pl

diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml
index 1bd5660c87..f519ae8c54 100644
--- a/doc/src/sgml/logical-replication.sgml
+++ b/doc/src/sgml/logical-replication.sgml
@@ -124,19 +124,27 @@
   </para>
 
   <para>
-   A published table must have a <quote>replica identity</quote> configured in
+   A published table must have a <firstterm>replica identity</firstterm> configured in
    order to be able to replicate <command>UPDATE</command>
    and <command>DELETE</command> operations, so that appropriate rows to
    update or delete can be identified on the subscriber side.  By default,
    this is the primary key, if there is one.  Another unique index (with
    certain additional requirements) can also be set to be the replica
-   identity.  If the table does not have any suitable key, then it can be set
+   identity. If the table does not have any suitable key, then it can be set
    to replica identity <quote>full</quote>, which means the entire row becomes
-   the key.  This, however, is very inefficient and should only be used as a
-   fallback if no other solution is possible.  If a replica identity other
-   than <quote>full</quote> is set on the publisher side, a replica identity
-   comprising the same or fewer columns must also be set on the subscriber
-   side.  See <xref linkend="sql-altertable-replica-identity"/> for details on
+   the key. When replica identity <literal>FULL</literal> is specified,
+   indexes can be used on the subscriber side for searching the rows. These
+   indexes should be btree, non-partial and have at least one column
+   reference (e.g., should not consist of only expressions). These restrictions
+   on the non-unique index properties are in essence the same restrictions that
+   are enforced for primary keys. Internally, we follow the same approach for
+   supporting index scans within logical replication scope. If there are no
+   such suitable indexes, the search on the subscriber side can be very inefficient,
+   therefore replica identity <literal>FULL</literal> should only be used as a
+   fallback if no other solution is possible. If a replica identity other
+   than <literal>full</literal> is set on the publisher side, a replica identity
+   comprising the same or fewer columns must also be set on the subscriber side.
+   See <xref linkend="sql-altertable-replica-identity"/> for details on
    how to set the replica identity.  If a table without a replica identity is
    added to a publication that replicates <command>UPDATE</command>
    or <command>DELETE</command> operations then
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index c484f5c301..f0172fb3a9 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -25,6 +25,7 @@
 #include "nodes/nodeFuncs.h"
 #include "parser/parse_relation.h"
 #include "parser/parsetree.h"
+#include "replication/logicalrelation.h"
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
@@ -37,49 +38,89 @@
 #include "utils/typcache.h"
 
 
+static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
+						 TypeCacheEntry **eq);
+
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
  * is setup to match 'rel' (*NOT* idxrel!).
  *
- * Returns whether any column contains NULLs.
+ * Returns how many columns should be used for the index scan.
+ *
+ * This is not generic routine, it expects the idxrel to be a btree, non-partial
+ * and have at least one column reference (e.g., should not consist of only
+ * expressions).
  *
- * This is not generic routine, it expects the idxrel to be replication
- * identity of a rel and meet all limitations associated with that.
+ * By definition, replication identity of a rel meets all limitations associated
+ * with that. Note that any other index could also meet these limitations.
  */
-static bool
+static int
 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 						 TupleTableSlot *searchslot)
 {
-	int			attoff;
+	int			index_attoff;
+	int			skey_attoff = 0;
 	bool		isnull;
 	Datum		indclassDatum;
 	oidvector  *opclass;
 	int2vector *indkey = &idxrel->rd_index->indkey;
-	bool		hasnulls = false;
-
-	Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
-		   RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
+	IndexInfo  *indexInfo PG_USED_FOR_ASSERTS_ONLY = NULL;
 
 	indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
 									Anum_pg_index_indclass, &isnull);
 	Assert(!isnull);
 	opclass = (oidvector *) DatumGetPointer(indclassDatum);
 
-	/* Build scankey for every attribute in the index. */
-	for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
+	/* Build scankey for every non-expression attribute in the index. */
+	for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
+		 index_attoff++)
 	{
 		Oid			operator;
 		Oid			opfamily;
+		Oid			optype = get_opclass_input_type(opclass->values[index_attoff]);
 		RegProcedure regop;
-		int			pkattno = attoff + 1;
-		int			mainattno = indkey->values[attoff];
-		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
+		int			table_attno = indkey->values[index_attoff];
+
+		if (!AttributeNumberIsValid(table_attno))
+		{
+			/*
+			 * This attribute is an expression, and
+			 * FindUsableIndexForReplicaIdentityFull() was called earlier
+			 * when the index for subscriber was selected. There, the indexes
+			 * comprising *only* expressions have already been eliminated.
+			 *
+			 * We sanity check this now.
+			 */
+#ifdef USE_ASSERT_CHECKING
+			/* apply assertions only once for the input idxrel */
+			if (indexInfo == NULL)
+			{
+				indexInfo = BuildIndexInfo(idxrel);
+				Assert(!IsIndexOnlyOnExpression(indexInfo));
+
+				/*
+				 * Furthermore, because PK/RI can't include expressions we also
+				 * sanity check the index is neither of those kinds.
+				 */
+				Assert(!IdxIsRelationIdentityOrPK(rel, idxrel->rd_id));
+			}
+#endif
+
+			/*
+			 * XXX: For a non-primary/unique index with an additional
+			 * expression, we do not have to continue at this point. However,
+			 * the below code assumes the index scan is only done for simple
+			 * column references. If we can relax the assumption in the below
+			 * code-block, we can also remove the continue.
+			 */
+			continue;
+		}
 
 		/*
 		 * Load the operator info.  We need this to get the equality operator
 		 * function for the scan key.
 		 */
-		opfamily = get_opclass_family(opclass->values[attoff]);
+		opfamily = get_opclass_family(opclass->values[index_attoff]);
 
 		operator = get_opfamily_member(opfamily, optype,
 									   optype,
@@ -91,23 +132,25 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		regop = get_opcode(operator);
 
 		/* Initialize the scankey. */
-		ScanKeyInit(&skey[attoff],
-					pkattno,
+		ScanKeyInit(&skey[skey_attoff],
+					index_attoff + 1,
 					BTEqualStrategyNumber,
 					regop,
-					searchslot->tts_values[mainattno - 1]);
+					searchslot->tts_values[table_attno - 1]);
 
-		skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+		skey[skey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
 
 		/* Check for null value. */
-		if (searchslot->tts_isnull[mainattno - 1])
-		{
-			hasnulls = true;
-			skey[attoff].sk_flags |= SK_ISNULL;
-		}
+		if (searchslot->tts_isnull[table_attno - 1])
+			skey[skey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
+
+		skey_attoff++;
 	}
 
-	return hasnulls;
+	/* There should always be at least one attribute for the index scan. */
+	Assert(skey_attoff > 0);
+
+	return skey_attoff;
 }
 
 /*
@@ -123,33 +166,54 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 							 TupleTableSlot *outslot)
 {
 	ScanKeyData skey[INDEX_MAX_KEYS];
+	int			skey_attoff;
 	IndexScanDesc scan;
 	SnapshotData snap;
 	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
+	TypeCacheEntry **eq = NULL;
+	bool		idxIsRelationIdentityOrPK;
 
 	/* Open the index. */
 	idxrel = index_open(idxoid, RowExclusiveLock);
 
-	/* Start an index scan. */
+	idxIsRelationIdentityOrPK = IdxIsRelationIdentityOrPK(rel, idxoid);
+
 	InitDirtySnapshot(snap);
-	scan = index_beginscan(rel, idxrel, &snap,
-						   IndexRelationGetNumberOfKeyAttributes(idxrel),
-						   0);
 
 	/* Build scan key. */
-	build_replindex_scan_key(skey, rel, idxrel, searchslot);
+	skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+	/* Start an index scan. */
+	scan = index_beginscan(rel, idxrel, &snap, skey_attoff, 0);
 
 retry:
 	found = false;
 
-	index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
+	index_rescan(scan, skey, skey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
-	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+	while (index_getnext_slot(scan, ForwardScanDirection, outslot))
 	{
-		found = true;
+		/*
+		 * Avoid expensive equality check if the index is primary key or
+		 * replica identity index.
+		 */
+		if (!idxIsRelationIdentityOrPK)
+		{
+			/*
+			 * We only need to allocate once. This is allocated within per
+			 * tuple context -- ApplyMessageContext -- hence no need to
+			 * explicitly pfree().
+			 */
+			if (eq == NULL)
+				eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
+
+			if (!tuples_equal(outslot, searchslot, eq))
+				continue;
+		}
+
 		ExecMaterializeSlot(outslot);
 
 		xwait = TransactionIdIsValid(snap.xmin) ?
@@ -164,6 +228,10 @@ retry:
 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 			goto retry;
 		}
+
+		/* Found our tuple and it's not locked */
+		found = true;
+		break;
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 55bfa07871..fb0531b560 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -17,11 +17,14 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/table.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_am_d.h"
 #include "catalog/pg_subscription_rel.h"
 #include "executor/executor.h"
 #include "nodes/makefuncs.h"
+#include "optimizer/cost.h"
 #include "replication/logicalrelation.h"
 #include "replication/worker_internal.h"
 #include "utils/inval.h"
@@ -50,6 +53,9 @@ typedef struct LogicalRepPartMapEntry
 	LogicalRepRelMapEntry relmapentry;
 } LogicalRepPartMapEntry;
 
+static Oid	FindLogicalRepUsableIndex(Relation localrel,
+									  LogicalRepRelation *remoterel);
+
 /*
  * Relcache invalidation callback for our relation map cache.
  */
@@ -439,6 +445,14 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		 */
 		logicalrep_rel_mark_updatable(entry);
 
+		/*
+		 * Finding a usable index is an infrequent task. It occurs when an
+		 * operation is first performed on the relation, or after invalidation
+		 * of the relation cache entry (such as ANALYZE or CREATE/DROP index
+		 * on the relation).
+		 */
+		entry->usableIndexOid = FindLogicalRepUsableIndex(entry->localrel, remoterel);
+
 		entry->localrelvalid = true;
 	}
 
@@ -697,6 +711,14 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	/* Set if the table's replica identity is enough to apply update/delete. */
 	logicalrep_rel_mark_updatable(entry);
 
+	/*
+	 * Finding a usable index is an infrequent task. It occurs when an
+	 * operation is first performed on the relation, or after invalidation of
+	 * the relation cache entry (such as ANALYZE or CREATE/DROP index on the
+	 * relation).
+	 */
+	entry->usableIndexOid = FindLogicalRepUsableIndex(partrel, remoterel);
+
 	entry->localrelvalid = true;
 
 	/* state and statelsn are left set to 0. */
@@ -704,3 +726,165 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 
 	return entry;
 }
+
+/*
+ * Returns true if the given index consists only of expressions such as:
+ * 	CREATE INDEX idx ON table(foo(col));
+ *
+ * Returns false even if there is one column reference:
+ * 	 CREATE INDEX idx ON table(foo(col), col_2);
+ */
+bool
+IsIndexOnlyOnExpression(IndexInfo *indexInfo)
+{
+	for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		AttrNumber	attnum = indexInfo->ii_IndexAttrNumbers[i];
+
+		if (AttributeNumberIsValid(attnum))
+			return false;
+	}
+
+	return true;
+}
+
+/*
+ * Returns the oid of an index that can be used via the apply worker. The index
+ * should be btree, non-partial and have at least one column reference (e.g.,
+ * should not consist of only expressions). The limitations arise from
+ * RelationFindReplTupleByIndex(), which is designed to handle PK/RI and these
+ * limitations are inherent to PK/RI.
+ *
+ * If no suitable index is found, returns InvalidOid.
+ *
+ * XXX: There are no fundamental problems for supporting non-btree indexes. We
+ * should mostly relax the limitations in RelationFindReplTupleByIndex(). For
+ * partial indexes, the required changes likely to be larger. If none of the
+ * tuples satisfy the expression for the index scan, we should fall-back to
+ * sequential execution, which might not be a good idea in some cases.
+ *
+ * Note that this is not a generic function, it expects REPLICA IDENTITY FULL
+ * for the remote relation.
+ */
+static Oid
+FindUsableIndexForReplicaIdentityFull(Relation localrel)
+{
+	Oid			usableIndex = InvalidOid;
+	List	   *indexlist;
+	ListCell   *lc;
+
+	/* Get index list of the local relation */
+	indexlist = RelationGetIndexList(localrel);
+	Assert(indexlist != NIL);
+
+	foreach(lc, indexlist)
+	{
+		Oid			idxoid = lfirst_oid(lc);
+		Relation	indexRelation = index_open(idxoid, AccessShareLock);
+		IndexInfo  *indexInfo = BuildIndexInfo(indexRelation);
+
+		bool		is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
+		bool		is_partial = (indexInfo->ii_Predicate != NIL);
+		bool		is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
+
+		index_close(indexRelation, AccessShareLock);
+
+		if (is_btree && !is_partial && !is_only_on_expression)
+		{
+			/* we found one eligible index, don't need to continue */
+			usableIndex = idxoid;
+			break;
+		}
+	}
+
+	return usableIndex;
+}
+
+/*
+ * Get replica identity index or if it is not defined a primary key.
+ *
+ * If neither is defined, returns InvalidOid
+ */
+Oid
+GetRelationIdentityOrPK(Relation rel)
+{
+	Oid			idxoid;
+
+	idxoid = RelationGetReplicaIndex(rel);
+
+	if (!OidIsValid(idxoid))
+		idxoid = RelationGetPrimaryKeyIndex(rel);
+
+	return idxoid;
+}
+
+/*
+ * Given a relation and OID of an index, returns true if the index is relation's
+ * replica identity index or relation's primary key's index.
+ *
+ * Returns false otherwise.
+ */
+bool
+IdxIsRelationIdentityOrPK(Relation rel, Oid idxoid)
+{
+	Assert(OidIsValid(idxoid));
+
+	return GetRelationIdentityOrPK(rel) == idxoid;
+}
+
+/*
+ * Returns the index oid if we can use an index for subscriber. Otherwise,
+ * returns InvalidOid.
+ */
+static Oid
+FindLogicalRepUsableIndex(Relation localrel, LogicalRepRelation *remoterel)
+{
+	Oid			idxoid;
+
+	/*
+	 * We never need index oid for partitioned tables, always rely on leaf
+	 * partition's index.
+	 */
+	if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return InvalidOid;
+
+	/*
+	 * Simple case, we already have a primary key or a replica identity index.
+	 */
+	idxoid = GetRelationIdentityOrPK(localrel);
+	if (OidIsValid(idxoid))
+		return idxoid;
+
+	/*
+	 * If index scans are disabled, use a sequential scan.
+	 *
+	 * Note that we do not use index scans below when enable_indexscan is
+	 * false. Allowing primary key or replica identity even when index scan is
+	 * disabled is the legacy behaviour. So we hesitate to move the below
+	 * enable_indexscan check to be done earlier in this function.
+	 */
+	if (!enable_indexscan)
+		return InvalidOid;
+
+	if (remoterel->replident == REPLICA_IDENTITY_FULL &&
+		RelationGetIndexList(localrel) != NIL)
+	{
+		/*
+		 * If we had a primary key or relation identity with a unique index,
+		 * we would have already found and returned that oid. At this point,
+		 * the remote relation has replica identity full and we have at least
+		 * one local index defined.
+		 *
+		 * We are looking for one more opportunity for using an index. If
+		 * there are any indexes defined on the local relation, try to pick a
+		 * suitable index.
+		 *
+		 * The index selection safely assumes that all the columns are going
+		 * to be available for the index scan given that remote relation has
+		 * replica identity full.
+		 */
+		return FindUsableIndexForReplicaIdentityFull(localrel);
+	}
+
+	return InvalidOid;
+}
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cfb2ab6248..e9569e59d3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -397,6 +397,8 @@ static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot);
+static Oid	get_usable_indexoid(ApplyExecutionData *edata,
+								ResultRelInfo *relinfo);
 static void apply_handle_update_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot,
@@ -406,6 +408,7 @@ static void apply_handle_delete_internal(ApplyExecutionData *edata,
 										 TupleTableSlot *remoteslot);
 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
 									LogicalRepRelation *remoterel,
+									Oid localidxoid,
 									TupleTableSlot *remoteslot,
 									TupleTableSlot **localslot);
 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
@@ -2348,24 +2351,6 @@ apply_handle_type(StringInfo s)
 	logicalrep_read_typ(s, &typ);
 }
 
-/*
- * Get replica identity index or if it is not defined a primary key.
- *
- * If neither is defined, returns InvalidOid
- */
-static Oid
-GetRelationIdentityOrPK(Relation rel)
-{
-	Oid			idxoid;
-
-	idxoid = RelationGetReplicaIndex(rel);
-
-	if (!OidIsValid(idxoid))
-		idxoid = RelationGetPrimaryKeyIndex(rel);
-
-	return idxoid;
-}
-
 /*
  * Check that we (the subscription owner) have sufficient privileges on the
  * target relation to perform the given operation.
@@ -2655,12 +2640,14 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 	TupleTableSlot *localslot;
 	bool		found;
 	MemoryContext oldctx;
+	Oid			usableIndexOid = get_usable_indexoid(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
 	found = FindReplTupleInLocalRel(estate, localrel,
 									&relmapentry->remoterel,
+									usableIndexOid,
 									remoteslot, &localslot);
 	ExecClearTuple(remoteslot);
 
@@ -2793,11 +2780,12 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EPQState	epqstate;
 	TupleTableSlot *localslot;
 	bool		found;
+	Oid			usableIndexOid = get_usable_indexoid(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
-	found = FindReplTupleInLocalRel(estate, localrel, remoterel,
+	found = FindReplTupleInLocalRel(estate, localrel, remoterel, usableIndexOid,
 									remoteslot, &localslot);
 
 	/* If found delete it. */
@@ -2828,20 +2816,50 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EvalPlanQualEnd(&epqstate);
 }
 
+/*
+ * Decide whether we can pick an index for the relinfo (i.e the relation) we're
+ * actually deleting/updating from. If it is a child partition of
+ * edata->targetRelInfo, find the index on the partition.
+ *
+ * Note that if the corresponding relmapentry has invalid usableIndexOid, the
+ * function returns InvalidOid.
+ */
+static Oid
+get_usable_indexoid(ApplyExecutionData *edata, ResultRelInfo *relinfo)
+{
+	ResultRelInfo *targetResultRelInfo = edata->targetRelInfo;
+	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+
+	char		targetrelkind = targetResultRelInfo->ri_RelationDesc->rd_rel->relkind;
+
+	if (targetrelkind == RELKIND_PARTITIONED_TABLE)
+	{
+		/* Target is a partitioned table, so find relmapentry of the partition */
+		TupleConversionMap *map = ExecGetRootToChildMap(relinfo, edata->estate);
+		AttrMap    *attrmap = map ? map->attrMap : NULL;
+
+		relmapentry =
+			logicalrep_partition_open(relmapentry, relinfo->ri_RelationDesc,
+									  attrmap);
+	}
+
+	return relmapentry->usableIndexOid;
+}
+
 /*
  * Try to find a tuple received from the publication side (in 'remoteslot') in
  * the corresponding local relation using either replica identity index,
- * primary key or if needed, sequential scan.
+ * primary key, index or if needed, sequential scan.
  *
  * Local tuple, if found, is returned in '*localslot'.
  */
 static bool
 FindReplTupleInLocalRel(EState *estate, Relation localrel,
 						LogicalRepRelation *remoterel,
+						Oid localidxoid,
 						TupleTableSlot *remoteslot,
 						TupleTableSlot **localslot)
 {
-	Oid			idxoid;
 	bool		found;
 
 	/*
@@ -2852,12 +2870,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
 
 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
 
-	idxoid = GetRelationIdentityOrPK(localrel);
-	Assert(OidIsValid(idxoid) ||
+	Assert(OidIsValid(localidxoid) ||
 		   (remoterel->replident == REPLICA_IDENTITY_FULL));
 
-	if (OidIsValid(idxoid))
-		found = RelationFindReplTupleByIndex(localrel, idxoid,
+	if (OidIsValid(localidxoid))
+		found = RelationFindReplTupleByIndex(localrel, localidxoid,
 											 LockTupleExclusive,
 											 remoteslot, *localslot);
 	else
@@ -2978,6 +2995,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				/* Get the matching local tuple from the partition. */
 				found = FindReplTupleInLocalRel(estate, partrel,
 												&part_entry->remoterel,
+												part_entry->usableIndexOid,
 												remoteslot_part, &localslot);
 				if (!found)
 				{
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 9c34054bb7..379b09e9b8 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -13,6 +13,7 @@
 #define LOGICALRELATION_H
 
 #include "access/attmap.h"
+#include "catalog/index.h"
 #include "replication/logicalproto.h"
 
 typedef struct LogicalRepRelMapEntry
@@ -31,6 +32,7 @@ typedef struct LogicalRepRelMapEntry
 	Relation	localrel;		/* relcache entry (NULL when closed) */
 	AttrMap    *attrmap;		/* map of local attributes to remote ones */
 	bool		updatable;		/* Can apply updates/deletes? */
+	Oid			usableIndexOid; /* which index to use, or InvalidOid if none */
 
 	/* Sync state. */
 	char		state;
@@ -46,5 +48,8 @@ extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *r
 														Relation partrel, AttrMap *map);
 extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
+extern bool IsIndexOnlyOnExpression(IndexInfo *indexInfo);
+extern Oid GetRelationIdentityOrPK(Relation rel);
+extern bool IdxIsRelationIdentityOrPK(Relation rel, Oid idxoid);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index 3db0fdfd96..f85bf92b6f 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -38,6 +38,7 @@ tests += {
       't/029_on_error.pl',
       't/030_origin.pl',
       't/031_column_list.pl',
+      't/032_subscribe_use_index.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/032_subscribe_use_index.pl b/src/test/subscription/t/032_subscribe_use_index.pl
new file mode 100644
index 0000000000..f7ae387c65
--- /dev/null
+++ b/src/test/subscription/t/032_subscribe_use_index.pl
@@ -0,0 +1,936 @@
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test logical replication behavior with subscriber uses available index
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	"wal_retrieve_retry_interval = 1ms");
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname           = 'tap_sub';
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX
+#
+# Basic test where the subscriber uses index
+# and only updates 1 row and deletes
+# 1 other row
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates one row via index";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x = 20;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 2) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes one row via index";
+
+# make sure that the subscriber has the correct data
+my $result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(DISTINCT x) FROM test_replica_id_full");
+is($result, qq(20), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CREATE/DROP INDEX WORKS WITHOUT ISSUES
+#
+# This test ensures that after CREATE INDEX, the subscriber can automatically
+# use one of the indexes (provided that it fulfils the requirements).
+# Similarly, after DROP index, the subscriber can automatically switch to
+# sequential scan
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL, y int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL, y int)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i, i FROM generate_series(0,2100)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# now, create index and see that the index is used
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# wait until the index is created
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(*)=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for creating index test_replica_id_full_idx";
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates one row via index";
+
+
+# now, create index on column y as well
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idy ON test_replica_id_full(y)");
+
+# wait until the index is created
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(*)=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idy';}
+) or die "Timed out while waiting for creating index test_replica_id_full_idy";
+
+# now, the update could either use the test_replica_id_full_idy or test_replica_id_full_idy index
+# it is not possible for user to control which index to use
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET y = y + 1 WHERE y = 3000;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(idx_scan) = 2 from pg_stat_all_indexes where indexrelname IN ('test_replica_id_full_idy', 'test_replica_id_full_idx');}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes one row via index";
+
+# let's also test dropping test_replica_id_full_idy and
+# hence use test_replica_id_full_idx
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX test_replica_id_full_idy;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 25;");
+$node_publisher->wait_for_catchup($appname);
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full WHERE x = 15 OR x = 25 OR y = 3000;");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION RE-CALCULATES INDEX AFTER CREATE/DROP INDEX
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS
+#
+# Basic test where the subscriber uses index
+# and updates 50 rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data within the range 0-19
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i%20 FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 50 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 50) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 50 rows via index";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x = 15;");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX UPDATEs MULTIPLE ROWS
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH MULTIPLE COLUMNS
+#
+# Basic test where the subscriber uses index
+# and deletes 200 rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-9
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# deletes 200 rows
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x IN (5, 6);");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes 200 rows via index";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x in (5, 6);");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH MULTIPLE COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+#
+# Basic test where the subscriber uses index
+# and updates multiple rows with a table that has
+# dropped columns
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-9
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 200 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x IN (5, 6);");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates 200 rows via index";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(x+y::int) from test_replica_id_full;");
+is($result, qq(9200), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_part_idx ON users_table_part(user_id, value_1)"
+);
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%100), (i%20), i FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE user_id = 4;");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=10 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for updates on partitioned table with index";
+
+# deletes rows from different partitions
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 1 and value_1 = 1;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 12 and value_1 = 12;");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=30 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates partitioned table";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(user_id+value_1+value_2) from users_table_part;");
+is($result, qq(550070), 'ensure subscriber has the correct data at the end of the test');
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(DISTINCT(user_id,value_1, value_2)) from users_table_part;");
+is($result, qq(981), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# Testcase end: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full_part_index REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_part_idx ON test_replica_id_full_part_index(x) WHERE (x = 5);");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full_part_index SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full_part_index");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows, one of them is indexed
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 5;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# make sure that the index is not used
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_part_idx'");
+is($result, qq(0), 'ensure subscriber tap_sub_rep_full updates one row via seq. scan with with partial index');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full_part_index;");
+is($result, qq(22), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(DISTINCT x) FROM test_replica_id_full_part_index;");
+is($result, qq(20), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people ((firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0,200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_2' AND lastname = 'last_name_2';");
+
+# make sure the index is not used on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names'");
+is($result, qq(0), 'ensure subscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions');
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_3';");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_4' AND lastname = 'last_name_4';");
+
+$node_publisher->wait_for_catchup($appname);
+
+# make sure the index is not used on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'people_names'");
+is($result, qq(0), 'ensure subscriber tap_sub_rep_full updates two rows via seq. scan with index on expressions');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people;");
+is($result, qq(199), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people (firstname, lastname, (firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0, 200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_3' AND lastname = 'last_name_3';");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates two rows via index scan with index on expressions and columns";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'Nan';");
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=4 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes two rows via index scan with index on expressions and columns";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people;");
+is($result, qq(199), 'ensure subscriber has the correct data at the end of the test');
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM people WHERE firstname = 'NaN';");
+is($result, qq(0), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: Some NULL values
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y);"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full VALUES (1), (2), (3);");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 1;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 3;");
+
+# check if the index is used even when the index has NULL values
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates test_replica_id_full table";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(x) from test_replica_id_full WHERE y IS NULL;");
+is($result, qq(8), 'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full WHERE y IS NULL;");
+is($result, qq(3), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: Some NULL values
+# ====================================================================
+
+# ====================================================================
+# Testcase start: Unique index that is not primary key or replica identity
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE UNIQUE INDEX test_replica_id_full_unique_idx ON test_replica_id_full(x);"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full (x, y) VALUES (NULL, 1), (NULL, 2), (NULL, 3);");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = 1 WHERE y = 2;");
+
+# check if the index is used even when the index has NULL values
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_unique_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates test_replica_id_full table";
+
+# make sure that the subscriber has the correct data
+$result = $node_subscriber->safe_psql('postgres',
+	"select sum(y) from test_replica_id_full;");
+is($result, qq(6), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: Unique index that is not primary key or replica identity
+# ====================================================================
+
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN
+#
+# Even if enable_indexscan = false, we do use the primary keys, this
+# is the legacy behavior. However, we do not use non-primary/non replica
+# identity columns.
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int NOT NULL)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+$node_subscriber->safe_psql('postgres',
+	"ALTER SYSTEM SET enable_indexscan TO off;");
+$node_subscriber->safe_psql('postgres',
+	"SELECT pg_reload_conf();");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 10000 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# show that index is not used when enable_indexscan=false
+$result = $node_subscriber->safe_psql('postgres',
+	"select idx_scan from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx'");
+is($result, qq(0), 'ensure subscriber has not used index with enable_indexscan=false');
+
+# we are done with this index, drop to simplify the tests
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX test_replica_id_full_idx");
+
+# now, create a unique index and set the replica
+$node_publisher->safe_psql('postgres',
+	"CREATE UNIQUE INDEX test_replica_id_full_unique ON test_replica_id_full(x);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE UNIQUE INDEX test_replica_id_full_unique ON test_replica_id_full(x);");
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY USING INDEX test_replica_id_full_unique;");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY USING INDEX test_replica_id_full_unique;");
+
+# wait for the synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 10000 WHERE x = 14;");
+$node_publisher->wait_for_catchup($appname);
+
+# show that the unique index on replica identity is used even when enable_indexscan=false
+# this is a legacy behavior
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan=1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_unique'}
+) or die "Timed out while waiting ensuring subscriber used unique index as replica identity even with enable_indexscan=false";
+
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full WHERE x IN (14,15)");
+is($result, qq(0), 'ensure the results are accurate even with enable_indexscan=false');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"ALTER SYSTEM RESET enable_indexscan;");
+$node_subscriber->safe_psql('postgres',
+	"SELECT pg_reload_conf();");
+
+# Testcase end: SUBSCRIPTION BEHAVIOR WITH ENABLE_INDEXSCAN
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH PUB/SUB different data
+#
+# The subscriber has duplicate tuples that publisher does not have.
+# When publsher updates/deletes 1 row, subscriber uses indexes and
+# exactly updates/deletes 1 row.
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+
+# duplicate the data in subscriber
+$node_subscriber->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+$node_publisher->wait_for_catchup($appname);
+
+# now, we update only 1 row on the publisher and expect
+# the subscriber to only update 1 row
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full updates one row via index";
+
+# similarly for delete
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x = 20;");
+$node_publisher->wait_for_catchup($appname);
+
+# wait until the index is used on the subscriber
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 2) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full deletes one row via index";
+
+# make sure that the subscriber has the correct data
+# we only deleted 1 row
+$result = $node_subscriber->safe_psql('postgres',
+	"SELECT count(*) FROM test_replica_id_full");
+is($result, qq(43), 'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH PUB/SUB different data
+# ====================================================================
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.34.1

Reply via email to