Hi again,

> ======
>
> 1. Commit message
>
> 1a.
> With this patch, I'm proposing the following change: If there is an
> index on the subscriber, use the index as long as the planner
> sub-modules picks any index over sequential scan. The index should be
> a btree index, not a partital index. Finally, the index should have at
> least one column reference (e.g., cannot consists of only
> expressions).
>
> SUGGESTION
> With this patch, I'm proposing the following change: If there is any
> index on the subscriber, let the planner sub-modules compare the costs
> of index versus sequential scan and choose the cheapest. The index
> should be a btree index, not a partial index, and it should have at
> least one column reference (e.g., cannot consist of only expressions).
>
>
makes sense.


> ~
>
> 1b.
> The Majority of the logic on the subscriber side exists in the code.
>
> "exists" -> "already exists"
>

fixed

>
> ~
>
> 1c.
> psql -c "truncate pgbench_accounts;" -p 9700 postgres
>
> "truncate" -> "TRUNCATE"
>

fixed


> ~
>
> 1d.
> Try to wrap this message text at 80 char width.
>

fixed


>
> ======
>
> 2. src/backend/replication/logical/relation.c - logicalrep_rel_open
>
> + /*
> + * Finding a usable index is an infrequent task. It is performed
> + * when an operation is first performed on the relation, or after
> + * invalidation of the relation cache entry (e.g., such as ANALYZE).
> + */
> + entry->usableIndexOid = LogicalRepUsableIndex(entry->localrel,
> remoterel);
>
> Seemed a bit odd to say "performed" 2x in the same sentence.
>
> "It is performed when..." -> "It occurs when...” (?)
>
>
fixed


> ~~~
>
> 3. src/backend/replication/logical/relation.c - logicalrep_partition_open
>
> + /*
> + * Finding a usable index is an infrequent task. It is performed
> + * when an operation is first performed on the relation, or after
> + * invalidation of the relation cache entry (e.g., such as ANALYZE).
> + */
> + part_entry->relmapentry.usableIndexOid =
> + LogicalRepUsableIndex(partrel, remoterel);
>
> 3a.
> Same as comment #2 above.
>

done


>
> ~
>
> 3b.
> The jumping between 'part_entry' and 'entry' is confusing. Since
> 'entry' is already assigned to be &part_entry->relmapentry can't you
> use that here?
>
> SUGGESTION
> entry->usableIndexOid = LogicalRepUsableIndex(partrel, remoterel);
>
> Yes, sure it makes sense.


> ~~~
>
> 4. src/backend/replication/logical/relation.c - GetIndexOidFromPath
>
> +/*
> + * Returns a valid index oid if the input path is an index path.
> + * Otherwise, return invalid oid.
> + */
> +static Oid
> +GetIndexOidFromPath(Path *path)
>
> Perhaps may this function comment more consistent with others (like
> GetRelationIdentityOrPK, LogicalRepUsableIndex) and refer to the
> InvalidOid.
>
> SUGGESTION
> /*
>  * Returns a valid index oid if the input path is an index path.
>  *
>  * Otherwise, returns InvalidOid.
>  */
>
> sounds good


> ~~~
>
> 5. src/backend/replication/logical/relation.c - IndexOnlyOnExpression
>
> +bool
> +IndexOnlyOnExpression(IndexInfo *indexInfo)
> +{
> + int i;
> + for (i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
> + {
> + AttrNumber attnum = indexInfo->ii_IndexAttrNumbers[i];
> + if (AttributeNumberIsValid(attnum))
> + return false;
> + }
> +
> + return true;
> +}
>
> 5a.
> Add a blank line after those declarations.
>
>
Done, also went over all the functions and ensured we don't have this
anymore


> ~
>
> 5b.
> AFAIK the C99 style for loop declarations should be OK [1] for new
> code, so declaring like below would be cleaner:
>
> for (int i = 0; ...
>
> Done

> ~~~
>
> 6. src/backend/replication/logical/relation.c -
> FilterOutNotSuitablePathsForReplIdentFull
>
> +/*
> + * Iterates over the input path list and returns another path list
> + * where paths with non-btree indexes, partial indexes or
> + * indexes on only expressions are eliminated from the list.
> + */
> +static List *
> +FilterOutNotSuitablePathsForReplIdentFull(List *pathlist)
>
> "are eliminated from the list." -> "have been removed."
>
> Done


> ~~~
>
> 7.
>
> + foreach(lc, pathlist)
> + {
> + Path    *path = (Path *) lfirst(lc);
> + Oid indexOid = GetIndexOidFromPath(path);
> + Relation indexRelation;
> + IndexInfo *indexInfo;
> + bool is_btree;
> + bool is_partial;
> + bool is_only_on_expression;
> +
> + if (!OidIsValid(indexOid))
> + {
> + /* Unrelated Path, skip */
> + suitableIndexList = lappend(suitableIndexList, path);
> + }
> + else
> + {
> + indexRelation = index_open(indexOid, AccessShareLock);
> + indexInfo = BuildIndexInfo(indexRelation);
> + is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
> + is_partial = (indexInfo->ii_Predicate != NIL);
> + is_only_on_expression = IndexOnlyOnExpression(indexInfo);
> + index_close(indexRelation, NoLock);
> +
> + if (is_btree && !is_partial && !is_only_on_expression)
> + suitableIndexList = lappend(suitableIndexList, path);
> + }
> + }
>
> I think most of those variables are only used in the "else" block so
> maybe it's better to declare them at that scope.
>
> + Relation indexRelation;
> + IndexInfo *indexInfo;
> + bool is_btree;
> + bool is_partial;
> + bool is_only_on_expression;
>
>
Makes sense


> ~~~
>
> 8. src/backend/replication/logical/relation.c -
> GetCheapestReplicaIdentityFullPath
>
> + * Indexes that consists of only expressions (e.g.,
> + * no simple column references on the index) are also
> + * eliminated with a similar reasoning.
>
> "consists" -> "consist"
>
> "with a similar reasoning" -> "with similar reasoning"
>
> fixed

> ~~~
>
> 9.
>
> + * We also eliminate non-btree indexes, which could be relaxed
> + * if needed. If we allow non-btree indexes, we should adjust
> + * RelationFindReplTupleByIndex() to support such indexes.
>
> This looks like another of those kinds of comments that should have
> "XXX" prefix as a note to the future.
>

added


>
> ~~~
>
> 10. src/backend/replication/logical/relation.c -
> FindUsableIndexForReplicaIdentityFull
>
> +/*
> + * Returns an index oid if the planner submodules picks index scans
> + * over sequential scan.
>
> 10a
> "picks" -> "pick"
>
>
done


> ~
>
> 10b.
> Maybe this should also say ", otherwise returns InvalidOid" (?)
>
>
Makes sense, added similar to above suggestion


> ~~~
>
> 11.
>
> +FindUsableIndexForReplicaIdentityFull(Relation localrel)
> +{
> + MemoryContext usableIndexContext;
> + MemoryContext oldctx;
> + Path *cheapest_total_path;
> + Oid indexOid;
>
> In the following function, and in the one after that, you've named the
> index Oid as 'idxoid' (not 'indexOid'). IMO it's better to use
> consistent naming everywhere.
>

 Ok, existing functions use idxoid, switched to that.

>
> ~~~
>
> 12. src/backend/replication/logical/relation.c - GetRelationIdentityOrPK
>
> 12a.
> I wondered what is the benefit of having this function. IIUC it is
> only called from one place (LogicalRepUsableIndex) and IMO the code
> would probably be easier if you just inline this logic in that
> function...
>
>
I just moved that from src/backend/replication/logical/worker.c, so
probably better not to remove it in this patch?

Tbh, I like the simplicity it provides.


> ~
>
> 12b.
> +/*
> + * Get replica identity index or if it is not defined a primary key.
> + *
> + * If neither is defined, returns InvalidOid
> + */
>
> If you want to keep the function for some reason (e.g. see #12a) then
> I thought the function comment could be better.
>
> SUGGESTION
> /*
>  * Returns OID of the relation's replica identity index, or OID of the
>  * relation's primary key index.
>  *
>  * If neither is defined, returns InvalidOid.
>  */
>
>
As I noted, I just moved this function. So, left as-is for now.


> ~~~
>
> 13. src/backend/replication/logical/relation.c - LogicalRepUsableIndex
>
> For some reason, I feel this function should be called
> FindLogicalRepUsableIndex (or similar), because it seems more
> consistent with the others which might return the Oid or might return
> InvalidOid...
>
>
Makes sense, changed


> ~~~
>
> 14.
>
> + /*
> + * Index scans are disabled, use sequential scan. Note that we do allow
> + * index scans when there is a primary key or unique index replica
> + * identity. That is the legacy behavior so we hesitate to move this check
> + * above.
> + */
>
> Perhaps a slight rephrasing of that comment?
>
> SUGGESTION
> If index scans are disabled, use a sequential scan.
>
> Note that we still allowed index scans above when there is a primary
> key or unique index replica identity, but that is the legacy behaviour
> (even when enable_indexscan is false), so we hesitate to move this
> enable_indexscan check to be done earlier in this function.
>
> ~~~
>

Sounds good, changed

>
> 15.
>
> + * If we had a primary key or relation identity with a unique index,
> + * we would have already found a valid oid. At this point, the remote
> + * relation has replica identity full and we have at least one local
> + * index defined.
>
> "would have already found a valid oid." -> "would have already found
> and returned that oid."
>

Done


>
> ======
>
> 16. src/backend/replication/logical/worker.c - usable_indexoid_internal
>
> +/*
> + * Decide whether we can pick an index for the relinfo (e.g., the
> relation)
> + * we're actually deleting/updating from. If it is a child partition of
> + * edata->targetRelInfo, find the index on the partition.
> + *
> + * Note that if the corresponding relmapentry has InvalidOid
> usableIndexOid,
> + * the function returns InvalidOid. In that case, the tuple is used via
> + * sequential execution.
> + */
> +static Oid
> +usable_indexoid_internal(ApplyExecutionData *edata, ResultRelInfo
> *relinfo)
>
> I am not sure this is the right place to be saying that last sentence
> ("In that case, the tuple is used via sequential execution.") because
> it's up to the *calling* code to decide what to do if InvalidOid is
> returned
>

 Right, for now this is true, but could change in the future. Removed.


> ======
>
> 17. src/include/replication/logicalrelation.h
>
> @ -31,20 +32,40 @@ typedef struct LogicalRepRelMapEntry
>   Relation localrel; /* relcache entry (NULL when closed) */
>   AttrMap    *attrmap; /* map of local attributes to remote ones */
>   bool updatable; /* Can apply updates/deletes? */
> + Oid usableIndexOid; /* which index to use? (Invalid when no index
> + * used) */
>
> SUGGESTION (for the comment)
> which index to use, or InvalidOid if none
>

makes sense


>
> ~~~
>
> 18.
>
> +/*
> + * Partition map (LogicalRepPartMap)
> + *
> + * When a partitioned table is used as replication target, replicated
> + * operations are actually performed on its leaf partitions, which
> requires
> + * the partitions to also be mapped to the remote relation.  Parent's
> entry
> + * (LogicalRepRelMapEntry) cannot be used as-is for all partitions,
> because
> + * individual partitions may have different attribute numbers, which means
> + * attribute mappings to remote relation's attributes must be maintained
> + * separately for each partition.
> + */
> +typedef struct LogicalRepPartMapEntry
>
> Something feels not quite right using the (unchanged) comment about
> the Partition map which was removed from where it was originally in
> relation.c.
>
> The reason I am unsure is that this comment is still referring to the
> "LogicalRepPartMap", which is not here but is declared static in
> relation.c. Maybe the quick/easy fix would be to just change the first
> line to say: "Partition map (see LogicalRepPartMap in relation.c)".
> OTOH, I'm not sure if some part of this comment still needs to be left
> in relation.c (??)
>
> Hmm, I agree that we need some extra comments pointing where this is used
(I followed something similar to your suggestion).

However, I also think that it is nicer to keep this comment here because
that seems more common in the code-base that the comments are on the
MapEntry, not on the Map itself, no?

Thanks,
Onder
From 2b04b9c786e3916f321f73673c0194469b9de8d5 Mon Sep 17 00:00:00 2001
From: Onder Kalaci <onderkalaci@gmail.com>
Date: Tue, 17 May 2022 10:47:39 +0200
Subject: [PATCH] Use indexes on the subscriber when REPLICA IDENTITY is full
 on the publisher

It is often not feasible to use `REPLICA IDENTITY FULL` on the publication
because it leads to full table scan per tuple change on the subscription.
This makes `REPLICA IDENTITY FULL` impracticable -- probably other than
some small number of use cases.

With this patch, I'm proposing the following change: If there is any
index on the subscriber, let the planner sub-modules compare the costs
of index versus sequential scan and choose the cheapest. The index
should be a btree index, not a partial index, and it should have at
least one column reference (e.g., cannot consist of only expressions).

The Majority of the logic on the subscriber side already exists in
the code. The subscriber is already capable of doing (unique) index
scans.  With this patch, we are allowing the index to iterate over the
tuples fetched and only act when tuples are equal. The ones familiar
with this part of the code could realize that the sequential scan
code on the subscriber already implements the `tuples_equal()`
function. In short, the changes on the subscriber is mostly
combining parts of (unique) index scan and sequential scan codes.

The decision on whether to use an index (or which index) is mostly
derived from planner infrastructure. The idea is that on the
subscriber we have all the columns. So, construct all the
`Path`s with the restrictions on all columns, such as
`col_1 = $1 AND col_2 = $2 ... AND col_n = $N`. Finally, let the
planner sub-module -- `make_one_rel()` -- to give us the relevant
index `Path`s. On top of that, add the sequential scan `Path` as
well. Finally, pick the cheapest `Path` among.

From the performance point of view, there are few things to note.
First, the patch aims not to change the behavior when PRIMARY KEY
or UNIQUE INDEX is used. Second, when REPLICA IDENTITY FULL is on
the publisher and an index is used on the subscriber, the
difference mostly comes down to `index scan` vs `sequential scan`.
That's why it is hard to claim certain number of improvements.
It mostly depends on the data size, index and the data distribution.

Still, below I try to showcase the potential improvements using an
index on the subscriber `pgbench_accounts(bid)`. With the index,
all the changes are replicated within ~5 seconds. When the index
is dropped, the same operation takes around ~300 seconds.

// init source db
pgbench -i -s 100 -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 5432 postgres
psql -c "CREATE INDEX i1 ON pgbench_accounts(aid);" -p 5432 postgres
psql -c "ALTER TABLE pgbench_accounts REPLICA IDENTITY FULL;" -p 5432 postgres
psql -c "CREATE PUBLICATION pub_test_1 FOR TABLE pgbench_accounts;" -p 5432 postgres

// init target db, drop existing primary key
pgbench -i -p 9700 postgres
psql -c "TRUNCATE pgbench_accounts;" -p 9700 postgres
psql -c "ALTER TABLE pgbench_accounts DROP CONSTRAINT pgbench_accounts_pkey;" -p 9700 postgres
psql -c "CREATE SUBSCRIPTION sub_test_1 CONNECTION 'host=localhost port=5432 user=onderkalaci dbname=postgres' PUBLICATION pub_test_1;" -p 9700 postgres

// create one index, even on a low cardinality column
psql -c "CREATE INDEX i2 ON pgbench_accounts(bid);" -p 9700 postgres

// now, run some pgbench tests and observe replication
pgbench -t 500 -b tpcb-like -p 5432 postgres
---
 src/backend/executor/execReplication.c        | 120 ++-
 src/backend/replication/logical/relation.c    | 402 +++++++-
 src/backend/replication/logical/worker.c      |  87 +-
 src/include/replication/logicalrelation.h     |  24 +-
 .../subscription/t/032_subscribe_use_index.pl | 937 ++++++++++++++++++
 5 files changed, 1485 insertions(+), 85 deletions(-)
 create mode 100644 src/test/subscription/t/032_subscribe_use_index.pl

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 6014f2e248..66accacbe7 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -19,12 +19,18 @@
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#ifdef USE_ASSERT_CHECKING
+#include "catalog/index.h"
+#endif
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
 #include "nodes/nodeFuncs.h"
 #include "parser/parse_relation.h"
 #include "parser/parsetree.h"
+#ifdef USE_ASSERT_CHECKING
+#include "replication/logicalrelation.h"
+#endif
 #include "storage/bufmgr.h"
 #include "storage/lmgr.h"
 #include "utils/builtins.h"
@@ -37,28 +43,29 @@
 #include "utils/typcache.h"
 
 
+static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
+						 TypeCacheEntry **eq);
+
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
  * is setup to match 'rel' (*NOT* idxrel!).
  *
- * Returns whether any column contains NULLs.
+ * Returns how many columns should be used for the index scan.
  *
- * This is not generic routine, it expects the idxrel to be replication
- * identity of a rel and meet all limitations associated with that.
+ * This is not generic routine, it expects the idxrel to be an index
+ * that planner would choose if the searchslot includes all the columns
+ * (e.g., REPLICA IDENTITY FULL on the source).
  */
-static bool
+static int
 build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 						 TupleTableSlot *searchslot)
 {
-	int			attoff;
+	int			index_attoff;
+	int			scankey_attoff = 0;
 	bool		isnull;
 	Datum		indclassDatum;
 	oidvector  *opclass;
 	int2vector *indkey = &idxrel->rd_index->indkey;
-	bool		hasnulls = false;
-
-	Assert(RelationGetReplicaIndex(rel) == RelationGetRelid(idxrel) ||
-		   RelationGetPrimaryKeyIndex(rel) == RelationGetRelid(idxrel));
 
 	indclassDatum = SysCacheGetAttr(INDEXRELID, idxrel->rd_indextuple,
 									Anum_pg_index_indclass, &isnull);
@@ -66,20 +73,49 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 	opclass = (oidvector *) DatumGetPointer(indclassDatum);
 
 	/* Build scankey for every attribute in the index. */
-	for (attoff = 0; attoff < IndexRelationGetNumberOfKeyAttributes(idxrel); attoff++)
+	for (index_attoff = 0; index_attoff < IndexRelationGetNumberOfKeyAttributes(idxrel);
+		 index_attoff++)
 	{
 		Oid			operator;
 		Oid			opfamily;
 		RegProcedure regop;
-		int			pkattno = attoff + 1;
-		int			mainattno = indkey->values[attoff];
-		Oid			optype = get_opclass_input_type(opclass->values[attoff]);
+		int			table_attno = indkey->values[index_attoff];
+		Oid			optype = get_opclass_input_type(opclass->values[index_attoff]);
+
+		if (!AttributeNumberIsValid(table_attno))
+		{
+			IndexInfo *indexInfo PG_USED_FOR_ASSERTS_ONLY;
+
+			/*
+			 * There are two cases to consider. First, if the index is a primary or
+			 * unique key, we cannot have any indexes with expressions. So, at this
+			 * point we are sure that the index we are dealing with is not these.
+			 */
+			Assert(RelationGetReplicaIndex(rel) != RelationGetRelid(idxrel) &&
+				   RelationGetPrimaryKeyIndex(rel) != RelationGetRelid(idxrel));
+
+			/*
+			 * At this point, we are also sure that the index is not consisting
+			 * of only expressions.
+			 */
+#ifdef USE_ASSERT_CHECKING
+			indexInfo = BuildIndexInfo(idxrel);
+			Assert(!IndexOnlyOnExpression(indexInfo));
+#endif
+
+			/*
+			 * XXX: For a non-primary/unique index with an additional expression,
+			 * do not have to continue at this point. However, the below code
+			 * assumes the index scan is only done for simple column references.
+			 */
+			continue;
+		}
 
 		/*
 		 * Load the operator info.  We need this to get the equality operator
 		 * function for the scan key.
 		 */
-		opfamily = get_opclass_family(opclass->values[attoff]);
+		opfamily = get_opclass_family(opclass->values[index_attoff]);
 
 		operator = get_opfamily_member(opfamily, optype,
 									   optype,
@@ -91,23 +127,25 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		regop = get_opcode(operator);
 
 		/* Initialize the scankey. */
-		ScanKeyInit(&skey[attoff],
-					pkattno,
+		ScanKeyInit(&skey[scankey_attoff],
+					index_attoff + 1,
 					BTEqualStrategyNumber,
 					regop,
-					searchslot->tts_values[mainattno - 1]);
+					searchslot->tts_values[table_attno - 1]);
 
-		skey[attoff].sk_collation = idxrel->rd_indcollation[attoff];
+		skey[scankey_attoff].sk_collation = idxrel->rd_indcollation[index_attoff];
 
 		/* Check for null value. */
-		if (searchslot->tts_isnull[mainattno - 1])
-		{
-			hasnulls = true;
-			skey[attoff].sk_flags |= SK_ISNULL;
-		}
+		if (searchslot->tts_isnull[table_attno - 1])
+			skey[scankey_attoff].sk_flags |= (SK_ISNULL | SK_SEARCHNULL);
+
+		scankey_attoff++;
 	}
 
-	return hasnulls;
+	/* We should always use at least one attribute for the index scan */
+	Assert (scankey_attoff > 0);
+
+	return scankey_attoff;
 }
 
 /*
@@ -128,28 +166,44 @@ RelationFindReplTupleByIndex(Relation rel, Oid idxoid,
 	TransactionId xwait;
 	Relation	idxrel;
 	bool		found;
+	TypeCacheEntry **eq = NULL; /* only used when the index is not unique */
+	bool		indisunique;
+	int			scankey_attoff;
 
 	/* Open the index. */
 	idxrel = index_open(idxoid, RowExclusiveLock);
+	indisunique = idxrel->rd_index->indisunique;
 
 	/* Start an index scan. */
 	InitDirtySnapshot(snap);
-	scan = index_beginscan(rel, idxrel, &snap,
-						   IndexRelationGetNumberOfKeyAttributes(idxrel),
-						   0);
 
 	/* Build scan key. */
-	build_replindex_scan_key(skey, rel, idxrel, searchslot);
+	scankey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
 
+	scan = index_beginscan(rel, idxrel, &snap, scankey_attoff, 0);
 retry:
 	found = false;
 
-	index_rescan(scan, skey, IndexRelationGetNumberOfKeyAttributes(idxrel), NULL, 0);
+	index_rescan(scan, skey, scankey_attoff, NULL, 0);
 
 	/* Try to find the tuple */
-	if (index_getnext_slot(scan, ForwardScanDirection, outslot))
+	while (index_getnext_slot(scan, ForwardScanDirection, outslot))
 	{
-		found = true;
+		/* Avoid expensive equality check if index is unique */
+		if (!indisunique)
+		{
+			/*
+			 * We only need to allocate once. This is allocated within
+			 * per tuple context -- ApplyMessageContext -- hence no
+			 * need to explicitly pfree().
+			 */
+			if (eq == NULL)
+				eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
+
+			if (!tuples_equal(outslot, searchslot, eq))
+				continue;
+		}
+
 		ExecMaterializeSlot(outslot);
 
 		xwait = TransactionIdIsValid(snap.xmin) ?
@@ -164,6 +218,10 @@ retry:
 			XactLockTableWait(xwait, NULL, NULL, XLTW_None);
 			goto retry;
 		}
+
+		/* Found our tuple and it's not locked */
+		found = true;
+		break;
 	}
 
 	/* Found tuple, try to lock it in the lockmode. */
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e989047681..c38f8182c1 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -17,38 +17,34 @@
 
 #include "postgres.h"
 
+#include "access/genam.h"
 #include "access/table.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_am_d.h"
 #include "catalog/pg_subscription_rel.h"
+#include "catalog/pg_operator.h"
+#include "commands/defrem.h"
 #include "executor/executor.h"
 #include "nodes/makefuncs.h"
 #include "replication/logicalrelation.h"
 #include "replication/worker_internal.h"
+#include "optimizer/cost.h"
+#include "optimizer/paramassign.h"
+#include "optimizer/pathnode.h"
+#include "optimizer/paths.h"
+#include "optimizer/plancat.h"
+#include "optimizer/restrictinfo.h"
 #include "utils/inval.h"
+#include "utils/typcache.h"
 
 
 static MemoryContext LogicalRepRelMapContext = NULL;
-
 static HTAB *LogicalRepRelMap = NULL;
-
-/*
- * Partition map (LogicalRepPartMap)
- *
- * When a partitioned table is used as replication target, replicated
- * operations are actually performed on its leaf partitions, which requires
- * the partitions to also be mapped to the remote relation.  Parent's entry
- * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
- * individual partitions may have different attribute numbers, which means
- * attribute mappings to remote relation's attributes must be maintained
- * separately for each partition.
- */
 static MemoryContext LogicalRepPartMapContext = NULL;
 static HTAB *LogicalRepPartMap = NULL;
-typedef struct LogicalRepPartMapEntry
-{
-	Oid			partoid;		/* LogicalRepPartMap's key */
-	LogicalRepRelMapEntry relmapentry;
-} LogicalRepPartMapEntry;
+
+static Oid	FindLogicalRepUsableIndex(Relation localrel,
+									  LogicalRepRelation *remoterel);
 
 /*
  * Relcache invalidation callback for our relation map cache.
@@ -438,6 +434,12 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		 */
 		logicalrep_rel_mark_updatable(entry);
 
+		/*
+		 * Finding a usable index is an infrequent task. It occurs when
+		 * an operation is first performed on the relation, or after
+		 * invalidation of the relation cache entry (e.g., such as ANALYZE).
+		 */
+		entry->usableIndexOid = FindLogicalRepUsableIndex(entry->localrel, remoterel);
 		entry->localrelvalid = true;
 	}
 
@@ -581,7 +583,7 @@ logicalrep_partmap_init(void)
  * Note there's no logicalrep_partition_close, because the caller closes the
  * component relation.
  */
-LogicalRepRelMapEntry *
+LogicalRepPartMapEntry *
 logicalrep_partition_open(LogicalRepRelMapEntry *root,
 						  Relation partrel, AttrMap *map)
 {
@@ -615,7 +617,7 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	if (found && entry->localrelvalid)
 	{
 		entry->localrel = partrel;
-		return entry;
+		return part_entry;
 	}
 
 	/* Switch to longer-lived context. */
@@ -696,10 +698,368 @@ logicalrep_partition_open(LogicalRepRelMapEntry *root,
 	/* Set if the table's replica identity is enough to apply update/delete. */
 	logicalrep_rel_mark_updatable(entry);
 
+	/*
+	 * Finding a usable index is an infrequent task. It occurs when
+	 * an operation is first performed on the relation, or after
+	 * invalidation of the relation cache entry (e.g., such as ANALYZE).
+	 */
+	entry->usableIndexOid = FindLogicalRepUsableIndex(partrel, remoterel);
+
 	entry->localrelvalid = true;
 
 	/* state and statelsn are left set to 0. */
 	MemoryContextSwitchTo(oldctx);
 
-	return entry;
+	return part_entry;
+}
+
+/*
+ * Returns a valid index oid if the input path is an index path.
+ *
+ * Otherwise, returns InvalidOid.
+ */
+static Oid
+GetIndexOidFromPath(Path *path)
+{
+	if (path->pathtype == T_IndexScan || path->pathtype == T_IndexOnlyScan)
+	{
+		IndexPath  *index_sc = (IndexPath *) path;
+		return index_sc->indexinfo->indexoid;
+	}
+
+	return InvalidOid;
+}
+
+/*
+ * Returns true if the given index consists only of expressions such as:
+ * 	CREATE INDEX idx ON table(foo(col));
+ *
+ * Returns false even if there is one column reference:
+ * 	 CREATE INDEX idx ON table(foo(col), col_2);
+ */
+bool
+IndexOnlyOnExpression(IndexInfo *indexInfo)
+{
+	for (int i = 0; i < indexInfo->ii_NumIndexKeyAttrs; i++)
+	{
+		AttrNumber	attnum = indexInfo->ii_IndexAttrNumbers[i];
+
+		if (AttributeNumberIsValid(attnum))
+			return false;
+	}
+
+	return true;
+}
+
+/*
+ * Iterates over the input path list and returns another path list
+ * where paths with non-btree indexes, partial indexes or
+ * indexes on only expressions have been removed.
+ */
+static List *
+FilterOutNotSuitablePathsForReplIdentFull(List *pathlist)
+{
+	ListCell   *lc;
+	List *suitableIndexList = NIL;
+
+	foreach(lc, pathlist)
+	{
+		Path	   *path = (Path *) lfirst(lc);
+		Oid indexOid = GetIndexOidFromPath(path);
+
+		if (!OidIsValid(indexOid))
+		{
+			/* Unrelated Path, skip */
+			suitableIndexList = lappend(suitableIndexList, path);
+		}
+		else
+		{
+			Relation indexRelation;
+			IndexInfo *indexInfo;
+			bool is_btree;
+			bool is_partial;
+			bool is_only_on_expression;
+
+			indexRelation = index_open(indexOid, AccessShareLock);
+			indexInfo = BuildIndexInfo(indexRelation);
+			is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
+			is_partial = (indexInfo->ii_Predicate != NIL);
+			is_only_on_expression = IndexOnlyOnExpression(indexInfo);
+			index_close(indexRelation, NoLock);
+
+			if (is_btree && !is_partial && !is_only_on_expression)
+				suitableIndexList = lappend(suitableIndexList, path);
+		}
+	}
+
+	return suitableIndexList;
+}
+
+/*
+ * Generate all the possible paths for the given subscriber relation,
+ * for the cases that the source relation is replicated via REPLICA
+ * IDENTITY FULL. The function returns the cheapest Path among the
+ * eligible paths, see FilterOutNotSuitablePathsForReplIdentFull().
+ *
+ * The function guarantees to return a path, because it adds sequential
+ * scan path if needed.
+ *
+ * The function assumes that all the columns will be provided during
+ * the execution phase, given that REPLICA IDENTITY FULL guarantees
+ * that.
+ */
+static Path *
+GetCheapestReplicaIdentityFullPath(Relation localrel)
+{
+	PlannerInfo *root;
+	Query	   *query;
+	PlannerGlobal *glob;
+	RangeTblEntry *rte;
+	RelOptInfo *rel;
+	int	attno;
+	RangeTblRef *rt;
+	List *joinList;
+	Path *seqScanPath;
+
+	/* Set up mostly-dummy planner state */
+	query = makeNode(Query);
+	query->commandType = CMD_SELECT;
+
+	glob = makeNode(PlannerGlobal);
+
+	root = makeNode(PlannerInfo);
+	root->parse = query;
+	root->glob = glob;
+	root->query_level = 1;
+	root->planner_cxt = CurrentMemoryContext;
+	root->wt_param_id = -1;
+
+	/* Build a minimal RTE for the rel */
+	rte = makeNode(RangeTblEntry);
+	rte->rtekind = RTE_RELATION;
+	rte->relid = localrel->rd_id;
+	rte->relkind = RELKIND_RELATION;
+	rte->rellockmode = AccessShareLock;
+	rte->lateral = false;
+	rte->inh = false;
+	rte->inFromCl = true;
+	query->rtable = list_make1(rte);
+
+	rt = makeNode(RangeTblRef);
+	rt->rtindex = 1;
+	joinList = list_make1(rt);
+
+	/* Set up RTE/RelOptInfo arrays */
+	setup_simple_rel_arrays(root);
+
+	/* Build RelOptInfo */
+	rel = build_simple_rel(root, 1, NULL);
+
+	/*
+	 * Generate restrictions for all columns in the form of col_1 = $1
+	 * AND col_2 = $2 ...
+	 */
+	for (attno = 0; attno < RelationGetNumberOfAttributes(localrel); attno++)
+	{
+		Form_pg_attribute attr = TupleDescAttr(localrel->rd_att, attno);
+
+		if (!attr->attisdropped)
+		{
+			Expr	   *eq_op;
+			TypeCacheEntry *typentry;
+			RestrictInfo *restrict_info;
+			Var		   *leftarg;
+			Param	   *rightarg;
+			int			varno = 1;
+
+			typentry = lookup_type_cache(attr->atttypid,
+										 TYPECACHE_EQ_OPR_FINFO);
+
+			if (!OidIsValid(typentry->eq_opr))
+				continue;		/* no equality operator skip this column */
+
+			leftarg =
+				makeVar(varno, attr->attnum, attr->atttypid, attr->atttypmod,
+						attr->attcollation, 0);
+
+			rightarg = makeNode(Param);
+			rightarg->paramkind = PARAM_EXTERN;
+			rightarg->paramid = list_length(rel->baserestrictinfo) + 1;
+			rightarg->paramtype = attr->atttypid;
+			rightarg->paramtypmod = attr->atttypmod;
+			rightarg->paramcollid = attr->attcollation;
+			rightarg->location = -1;
+
+			eq_op = make_opclause(typentry->eq_opr, BOOLOID, false,
+								  (Expr *) leftarg, (Expr *) rightarg,
+								  InvalidOid, attr->attcollation);
+
+			restrict_info = make_simple_restrictinfo(root, eq_op);
+
+			rel->baserestrictinfo = lappend(rel->baserestrictinfo, restrict_info);
+		}
+	}
+
+	/*
+	 * Make sure the planner generates the relevant paths, including
+	 * all the possible index scans as well as sequential scan.
+	 */
+	rel = make_one_rel(root, joinList);
+
+	/*
+	 * Currently it is not possible for planner to pick a
+	 * partial index or indexes only on expressions. We
+	 * still want to be explicit and eliminate such
+	 * paths proactively.
+	 *
+	 * The reason that the planner would not pick partial
+	 * indexes and indexes with only expressions based
+	 * on the way currently baserestrictinfos are
+	 * formed (e.g., col_1 = $1 ... AND col_N = $2).
+	 *
+	 * For the partial indexes, check_index_predicates()
+	 * (via operator_predicate_proof()) checks whether the
+	 * predicate of the index is implied by the
+	 * baserestrictinfos. The check always returns false
+	 * because index predicates formed with CONSTs and
+	 * baserestrictinfos are formed with PARAMs. Hence,
+	 * partial indexes are never picked.
+	 *
+	 * Indexes that consist of only expressions (e.g.,
+	 * no simple column references on the index) are also
+	 * eliminated with similar reasoning.
+	 * match_restriction_clauses_to_index() (via
+	 * match_index_to_operand()) eliminates the use of the
+	 * index if the restriction does not have the equal
+	 * expression with the index.
+	 *
+	 * XXX: We also eliminate non-btree indexes, which could be
+	 * relaxed if needed. If we allow non-btree indexes, we should
+	 * adjust RelationFindReplTupleByIndex() to support such indexes.
+	 */
+	rel->pathlist =
+		FilterOutNotSuitablePathsForReplIdentFull(rel->pathlist);
+
+	if (rel->pathlist == NIL)
+	{
+		/*
+		 * A sequential scan could have been dominated by
+		 * by an index scan during make_one_rel(). We should always
+		 * have a sequential scan before set_cheapest().
+		 */
+		seqScanPath = create_seqscan_path(root, rel, NULL, 0);
+		add_path(rel, seqScanPath);
+	}
+
+	set_cheapest(rel);
+
+	Assert(rel->cheapest_total_path != NULL);
+
+	return rel->cheapest_total_path;
+}
+
+/*
+ * Returns an index oid if the planner submodules pick index scans
+ * over sequential scan.
+ *
+ * Otherwise, returns InvalidOid.
+ *
+ * Note that this is not a generic function, it expects REPLICA
+ * IDENTITY FULL for the remote relation.
+ */
+static Oid
+FindUsableIndexForReplicaIdentityFull(Relation localrel)
+{
+	MemoryContext usableIndexContext;
+	MemoryContext oldctx;
+	Path 		*cheapest_total_path;
+	Oid			idxoid;
+
+	usableIndexContext = AllocSetContextCreate(CurrentMemoryContext,
+											   "usableIndexContext",
+											   ALLOCSET_DEFAULT_SIZES);
+	oldctx = MemoryContextSwitchTo(usableIndexContext);
+
+	cheapest_total_path = GetCheapestReplicaIdentityFullPath(localrel);
+
+	idxoid = GetIndexOidFromPath(cheapest_total_path);
+
+	MemoryContextSwitchTo(oldctx);
+
+	MemoryContextDelete(usableIndexContext);
+
+	return idxoid;
+}
+
+/*
+ * Get replica identity index or if it is not defined a primary key.
+ *
+ * If neither is defined, returns InvalidOid
+ */
+static Oid
+GetRelationIdentityOrPK(Relation rel)
+{
+	Oid			idxoid;
+
+	idxoid = RelationGetReplicaIndex(rel);
+
+	if (!OidIsValid(idxoid))
+		idxoid = RelationGetPrimaryKeyIndex(rel);
+
+	return idxoid;
+}
+
+/*
+ * Returns an index oid if we can use an index for the apply side. If not,
+ * returns InvalidOid.
+ */
+static Oid
+FindLogicalRepUsableIndex(Relation localrel, LogicalRepRelation *remoterel)
+{
+	Oid			idxoid;
+
+	/*
+	 * We never need index oid for partitioned tables, always rely on leaf
+	 * partition's index.
+	 */
+	if (localrel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+		return InvalidOid;
+
+	/* Simple case, we already have an identity or pkey */
+	idxoid = GetRelationIdentityOrPK(localrel);
+	if (OidIsValid(idxoid))
+		return idxoid;
+
+	/*
+	 * If index scans are disabled, use a sequential scan.
+	 *
+	 * Note that we still allowed index scans above when there is a primary
+	 * key or unique index replica identity, but that is the legacy behaviour
+	 * (even when enable_indexscan is false), so we hesitate to move this
+	 * enable_indexscan check to be done earlier in this function.
+	 */
+	if (!enable_indexscan)
+		return InvalidOid;
+
+	if (remoterel->replident == REPLICA_IDENTITY_FULL &&
+		RelationGetIndexList(localrel) != NIL)
+	{
+		/*
+		 * If we had a primary key or relation identity with a unique index,
+		 * we would have already found and returned that oid. At this point,
+		 * the remote relation has replica identity full and we have at
+		 * least one local index defined.
+		 *
+		 * We are looking for one more opportunity for using an index. If
+		 * there are any indexes defined on the local relation, try to pick
+		 * the cheapest index.
+		 *
+		 * The index selection safely assumes that all the columns are going
+		 * to be available for the index scan given that remote relation has
+		 * replica identity full.
+		 */
+		return FindUsableIndexForReplicaIdentityFull(localrel);
+	}
+
+	return InvalidOid;
 }
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5f8c541763..548d892890 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -331,6 +331,8 @@ static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot);
+static Oid	usable_indexoid_internal(ApplyExecutionData *edata,
+									 ResultRelInfo *relinfo);
 static void apply_handle_update_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot,
@@ -339,6 +341,7 @@ static void apply_handle_delete_internal(ApplyExecutionData *edata,
 										 ResultRelInfo *relinfo,
 										 TupleTableSlot *remoteslot);
 static bool FindReplTupleInLocalRel(EState *estate, Relation localrel,
+									Oid localidxoid,
 									LogicalRepRelation *remoterel,
 									TupleTableSlot *remoteslot,
 									TupleTableSlot **localslot);
@@ -1586,24 +1589,6 @@ apply_handle_type(StringInfo s)
 	logicalrep_read_typ(s, &typ);
 }
 
-/*
- * Get replica identity index or if it is not defined a primary key.
- *
- * If neither is defined, returns InvalidOid
- */
-static Oid
-GetRelationIdentityOrPK(Relation rel)
-{
-	Oid			idxoid;
-
-	idxoid = RelationGetReplicaIndex(rel);
-
-	if (!OidIsValid(idxoid))
-		idxoid = RelationGetPrimaryKeyIndex(rel);
-
-	return idxoid;
-}
-
 /*
  * Check that we (the subscription owner) have sufficient privileges on the
  * target relation to perform the given operation.
@@ -1753,7 +1738,7 @@ check_relation_updatable(LogicalRepRelMapEntry *rel)
 	 * We are in error mode so it's fine this is somewhat slow. It's better to
 	 * give user correct error.
 	 */
-	if (OidIsValid(GetRelationIdentityOrPK(rel->localrel)))
+	if (OidIsValid(rel->usableIndexOid))
 	{
 		ereport(ERROR,
 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1896,11 +1881,13 @@ apply_handle_update_internal(ApplyExecutionData *edata,
 	TupleTableSlot *localslot;
 	bool		found;
 	MemoryContext oldctx;
+	Oid			usableIndexOid = usable_indexoid_internal(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
 	found = FindReplTupleInLocalRel(estate, localrel,
+									usableIndexOid,
 									&relmapentry->remoterel,
 									remoteslot, &localslot);
 	ExecClearTuple(remoteslot);
@@ -2034,12 +2021,13 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EPQState	epqstate;
 	TupleTableSlot *localslot;
 	bool		found;
+	Oid			usableIndexOid = usable_indexoid_internal(edata, relinfo);
 
 	EvalPlanQualInit(&epqstate, estate, NULL, NIL, -1);
 	ExecOpenIndices(relinfo, false);
 
-	found = FindReplTupleInLocalRel(estate, localrel, remoterel,
-									remoteslot, &localslot);
+	found = FindReplTupleInLocalRel(estate, localrel, usableIndexOid,
+									remoterel, remoteslot, &localslot);
 
 	/* If found delete it. */
 	if (found)
@@ -2069,20 +2057,56 @@ apply_handle_delete_internal(ApplyExecutionData *edata,
 	EvalPlanQualEnd(&epqstate);
 }
 
+/*
+ * Decide whether we can pick an index for the relinfo (e.g., the relation)
+ * we're actually deleting/updating from. If it is a child partition of
+ * edata->targetRelInfo, find the index on the partition.
+ *
+ * Note that if the corresponding relmapentry has InvalidOid usableIndexOid,
+ * the function returns InvalidOid.
+ */
+static Oid
+usable_indexoid_internal(ApplyExecutionData *edata, ResultRelInfo *relinfo)
+{
+	ResultRelInfo *targetResultRelInfo = edata->targetRelInfo;
+	LogicalRepRelMapEntry *relmapentry = edata->targetRel;
+
+	Oid			targetrelid = targetResultRelInfo->ri_RelationDesc->rd_rel->oid;
+	Oid			localrelid = relinfo->ri_RelationDesc->rd_id;
+
+	if (targetrelid != localrelid)
+	{
+		/* Target is a partitioned table, so find relmapentry of the partition */
+		TupleConversionMap *map = relinfo->ri_RootToPartitionMap;
+		AttrMap    *attrmap = map ? map->attrMap : NULL;
+
+		LogicalRepPartMapEntry *part_entry =
+		logicalrep_partition_open(relmapentry, relinfo->ri_RelationDesc,
+								  attrmap);
+
+		Assert(targetResultRelInfo->ri_RelationDesc->rd_rel->relkind ==
+			   RELKIND_PARTITIONED_TABLE);
+
+		relmapentry = &(part_entry->relmapentry);
+	}
+
+	return relmapentry->usableIndexOid;
+}
+
 /*
  * Try to find a tuple received from the publication side (in 'remoteslot') in
  * the corresponding local relation using either replica identity index,
- * primary key or if needed, sequential scan.
+ * primary key, index or if needed, sequential scan.
  *
  * Local tuple, if found, is returned in '*localslot'.
  */
 static bool
 FindReplTupleInLocalRel(EState *estate, Relation localrel,
+						Oid localidxoid,
 						LogicalRepRelation *remoterel,
 						TupleTableSlot *remoteslot,
 						TupleTableSlot **localslot)
 {
-	Oid			idxoid;
 	bool		found;
 
 	/*
@@ -2093,12 +2117,11 @@ FindReplTupleInLocalRel(EState *estate, Relation localrel,
 
 	*localslot = table_slot_create(localrel, &estate->es_tupleTable);
 
-	idxoid = GetRelationIdentityOrPK(localrel);
-	Assert(OidIsValid(idxoid) ||
+	Assert(OidIsValid(localidxoid) ||
 		   (remoterel->replident == REPLICA_IDENTITY_FULL));
 
-	if (OidIsValid(idxoid))
-		found = RelationFindReplTupleByIndex(localrel, idxoid,
+	if (OidIsValid(localidxoid))
+		found = RelationFindReplTupleByIndex(localrel, localidxoid,
 											 LockTupleExclusive,
 											 remoteslot, *localslot);
 	else
@@ -2128,7 +2151,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	TupleTableSlot *remoteslot_part;
 	TupleConversionMap *map;
 	MemoryContext oldctx;
-	LogicalRepRelMapEntry *part_entry = NULL;
+	LogicalRepPartMapEntry *part_entry = NULL;
 	AttrMap    *attrmap = NULL;
 
 	/* ModifyTableState is needed for ExecFindPartition(). */
@@ -2178,7 +2201,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 	{
 		part_entry = logicalrep_partition_open(relmapentry, partrel,
 											   attrmap);
-		check_relation_updatable(part_entry);
+		check_relation_updatable(&part_entry->relmapentry);
 	}
 
 	switch (operation)
@@ -2202,13 +2225,15 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 			 * suitable partition.
 			 */
 			{
+				LogicalRepRelMapEntry *entry = &part_entry->relmapentry;
 				TupleTableSlot *localslot;
 				ResultRelInfo *partrelinfo_new;
 				bool		found;
 
 				/* Get the matching local tuple from the partition. */
 				found = FindReplTupleInLocalRel(estate, partrel,
-												&part_entry->remoterel,
+												entry->usableIndexOid,
+												&entry->remoterel,
 												remoteslot_part, &localslot);
 				if (!found)
 				{
@@ -2230,7 +2255,7 @@ apply_handle_tuple_routing(ApplyExecutionData *edata,
 				 * remoteslot_part.
 				 */
 				oldctx = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-				slot_modify_data(remoteslot_part, localslot, part_entry,
+				slot_modify_data(remoteslot_part, localslot, entry,
 								 newtup);
 				MemoryContextSwitchTo(oldctx);
 
diff --git a/src/include/replication/logicalrelation.h b/src/include/replication/logicalrelation.h
index 78cd7e77f5..ada4965230 100644
--- a/src/include/replication/logicalrelation.h
+++ b/src/include/replication/logicalrelation.h
@@ -13,6 +13,7 @@
 #define LOGICALRELATION_H
 
 #include "access/attmap.h"
+#include "catalog/index.h"
 #include "replication/logicalproto.h"
 
 typedef struct LogicalRepRelMapEntry
@@ -31,20 +32,39 @@ typedef struct LogicalRepRelMapEntry
 	Relation	localrel;		/* relcache entry (NULL when closed) */
 	AttrMap    *attrmap;		/* map of local attributes to remote ones */
 	bool		updatable;		/* Can apply updates/deletes? */
+	Oid			usableIndexOid; /* which index to use, or InvalidOid if none */
 
 	/* Sync state. */
 	char		state;
 	XLogRecPtr	statelsn;
 } LogicalRepRelMapEntry;
 
+/*
+ * Used for Partition mapping (see LogicalRepPartMap in logical/relation.c)
+ *
+ * When a partitioned table is used as replication target, replicated
+ * operations are actually performed on its leaf partitions, which requires
+ * the partitions to also be mapped to the remote relation.  Parent's entry
+ * (LogicalRepRelMapEntry) cannot be used as-is for all partitions, because
+ * individual partitions may have different attribute numbers, which means
+ * attribute mappings to remote relation's attributes must be maintained
+ * separately for each partition.
+ */
+typedef struct LogicalRepPartMapEntry
+{
+	Oid			partoid;		/* LogicalRepPartMap's key */
+	LogicalRepRelMapEntry relmapentry;
+} LogicalRepPartMapEntry;
+
 extern void logicalrep_relmap_update(LogicalRepRelation *remoterel);
 extern void logicalrep_partmap_reset_relmap(LogicalRepRelation *remoterel);
 
 extern LogicalRepRelMapEntry *logicalrep_rel_open(LogicalRepRelId remoteid,
 												  LOCKMODE lockmode);
-extern LogicalRepRelMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
-														Relation partrel, AttrMap *map);
+extern LogicalRepPartMapEntry *logicalrep_partition_open(LogicalRepRelMapEntry *root,
+														 Relation partrel, AttrMap *map);
 extern void logicalrep_rel_close(LogicalRepRelMapEntry *rel,
 								 LOCKMODE lockmode);
+extern bool IndexOnlyOnExpression(IndexInfo *indexInfo);
 
 #endif							/* LOGICALRELATION_H */
diff --git a/src/test/subscription/t/032_subscribe_use_index.pl b/src/test/subscription/t/032_subscribe_use_index.pl
new file mode 100644
index 0000000000..bfc08fdf0f
--- /dev/null
+++ b/src/test/subscription/t/032_subscribe_use_index.pl
@@ -0,0 +1,937 @@
+# Copyright (c) 2021-2022, PostgreSQL Global Development Group
+
+# Test logical replication behavior with subscriber uses available index
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->append_conf('postgresql.conf',
+	"wal_retrieve_retry_interval = 1ms");
+
+# we don't want planner to pick bitmap scans instead of index scans
+$node_subscriber->append_conf('postgresql.conf',
+       "enable_bitmapscan = off");
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname           = 'tap_sub';
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX
+#
+# Basic test where the subscriber uses index
+# and only touches 1 row
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 1) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via index";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x = 20;");
+$node_publisher->wait_for_catchup($appname);
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 2) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for'check subscriber tap_sub_rep_full_0 deletes one row via index";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX MODIFIES MULTIPLE ROWS
+#
+# Basic test where the subscriber uses index
+# and touches multiple rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x)");
+
+# insert some initial data within the range 0-1000
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i%20 FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_2 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 1000 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 50) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_2 updates 50 rows via index";
+
+my $count_from_table =
+  $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x = 15;");
+is($count_from_table, qq(0),
+	'check subscriber tap_sub_rep_full_2 for no rows remaining with x=15');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_2");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX MODIFIES MULTIPLE ROWS
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH MULTIPLE COLUMNS
+#
+# Basic test where the subscriber uses index
+# and touches multiple rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-1000
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_3 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 1000 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x IN (5, 6);");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_3 updates 200 rows via index";
+
+my $sum_from_table =
+  $node_subscriber->safe_psql('postgres',
+	"select sum(x+y::int) from test_replica_id_full;");
+is($sum_from_table, qq(9200),
+	'check subscriber tap_sub_rep_full_3 for query result');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_3");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX MODIFIES MULTIPLE ROWS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+#
+# Basic test where the subscriber uses index
+# and touches multiple rows
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (drop_1 jsonb, x int, drop_2 point, y text, drop_3 timestamptz)"
+);
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_1");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_2");
+$node_subscriber->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full DROP COLUMN drop_3");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y)");
+
+# insert some initial data within the range 0-1000
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT (i%10), (i%10)::text FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_4 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates 1000 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x IN (5, 6);");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select (idx_scan = 200) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_4 updates 200 rows via index'";
+
+my $sum_from_table_2 =
+  $node_subscriber->safe_psql('postgres',
+	"select sum(x+y::int) from test_replica_id_full;");
+is($sum_from_table_2, qq(9200),
+	'check subscriber tap_sub_rep_full_4 for query result');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_4");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: SUBSCRIPTION USES INDEX WITH DROPPED COLUMNS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (value_1);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_part_idx ON users_table_part(user_id, value_1)"
+);
+
+# insert some initial data within the range 0-1000
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%100), (i%20), i FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_5 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE user_id = 4;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=10 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 1 and value_1 = 1;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE user_id = 12 and value_1 = 12;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=30 from pg_stat_all_indexes where indexrelname ilike 'users_table_part_%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_5");
+$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# Testcase end: SUBSCRIPTION USES INDEX ON PARTITIONED TABLES
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full_part_index REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full_part_index (x int);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_part_idx ON test_replica_id_full_part_index(x) WHERE (x = 5);");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full_part_index SELECT i FROM generate_series(0,21)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full_part_index");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows, one of them is indexed
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 5;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full_part_index SET x = x + 1 WHERE x = 15;");
+$node_publisher->wait_for_catchup($appname);
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_part_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via seq. scan with with partial index'";
+
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_part_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates one row via seq. scan with with partial index'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full_part_index");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE PARTIAL INDEX
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people ((firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0,200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_2' AND lastname = 'last_name_2';");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via seq. scan with index on expressions";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_3';");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'first_name_4' AND lastname = 'last_name_4';");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 deletes one row via seq. scan with index on expressions";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION DOES NOT USE INDEXES WITH ONLY EXPRESSIONS
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE people REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE people (firstname text, lastname text);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX people_names ON people (firstname, lastname, (firstname || ' ' || lastname));");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO people SELECT 'first_name_' || i::text, 'last_name_' || i::text FROM generate_series(0, 200)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE people");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_1';");
+$node_publisher->safe_psql('postgres',
+	"UPDATE people SET firstname = 'Nan' WHERE firstname = 'first_name_3' AND lastname = 'last_name_3';");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on expressions and columns";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM people WHERE firstname = 'Nan';");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=4 from pg_stat_all_indexes where indexrelname = 'people_names';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 deletes two rows via index scan with index on expressions and columns";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE people");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE people");
+
+# Testcase end: SUBSCRIPTION CAN USE INDEXES WITH EXPRESSIONS AND COLUMNS
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test (column_a int, column_b int);");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test REPLICA IDENTITY FULL;");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test (column_a int, column_b int);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_a ON test (column_a);");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_b ON test (column_b);");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test SELECT i,0 FROM generate_series(0, 2000)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_0 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_subscriber->safe_psql('postgres', "ANALYZE test;");
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test SET column_b = column_b + 1 WHERE column_a = 15;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test WHERE column_a = 20;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'index_a';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-1";
+
+# do not use index_b
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=0 from pg_stat_all_indexes where indexrelname = 'index_b';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-2";
+
+# insert data such that the cardinality of column_b becomes much higher
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test SELECT 0,i FROM generate_series(0, 20000)i;");
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres', "ANALYZE test;");
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test SET column_a = column_a + 1 WHERE column_b = 150;");
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test WHERE column_b = 200;");
+
+
+# do not use index_a anymore
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'index_a';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-3";
+
+# now, use index_b as well
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'index_b';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_0 updates two rows via index scan with index on high cardinality column-4";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test");
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_0");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test");
+
+# Testcase end: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE
+# ====================================================================
+
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - PARTITIONED TABLE
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (user_id);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_0 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE users_table_part_1 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part(user_id bigint, value_1 int, value_2 int) PARTITION BY RANGE (user_id);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_0 PARTITION OF users_table_part FOR VALUES FROM (0) TO (10);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE users_table_part_1 PARTITION OF users_table_part FOR VALUES FROM (10) TO (20);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_ind_on_value_1 ON users_table_part(value_1)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_ind_on_value_2 ON users_table_part(value_2)"
+);
+
+# insert some initial data where cardinality of value_1 is high, and cardinality of value_2 is very low
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%20), i, i%2 FROM generate_series(0,1000)i;"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE users_table_part");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_5 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_subscriber->safe_psql('postgres',	"ANALYZE users_table_part");
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE value_1 = 30;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=1 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%1%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE value_1 = 40");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=2 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%1%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+
+# now, load some more data where cardinality of value_2 is high, and cardinality of value_1 is very low
+$node_publisher->safe_psql('postgres', "TRUNCATE users_table_part");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%20), i%2, i FROM generate_series(0,10000)i;"
+);
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres',	"ANALYZE users_table_part");
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE value_2 = 3000;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=1 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%2%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM users_table_part WHERE value_2 = 4000");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=2 from pg_stat_all_indexes where indexrelname ilike 'users_table%value%2%';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table'";
+
+
+# finally, make sure that even an index is only defined on a partition (e.g., not inherited from parent)
+# we can still use that
+
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX users_table_ind_on_value_1;"
+);
+$node_subscriber->safe_psql('postgres',
+	"DROP INDEX users_table_ind_on_value_2;"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX users_table_ind_on_value_2 ON users_table_part_0(value_2)"
+);
+
+# now, load some more data where cardinality of value_2 is high, and cardinality of value_1 is very low
+$node_publisher->safe_psql('postgres', "TRUNCATE users_table_part");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO users_table_part SELECT (i%20), i%2, i FROM generate_series(0,10000)i;"
+);
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->safe_psql('postgres',	"ANALYZE users_table_part");
+
+$node_publisher->safe_psql('postgres',
+	"UPDATE users_table_part SET value_1 = 0 WHERE value_2 > 3000 AND user_id = 0;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(idx_scan)=350 from pg_stat_all_indexes where indexrelname ilike 'users_table_ind_on_value_2';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates partitioned table with index on partition'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_5");
+$node_subscriber->safe_psql('postgres', "DROP TABLE users_table_part");
+
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - PARTITIONED TABLE
+# ====================================================================
+
+# ====================================================================
+# Testcase start: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - INHERITED TABLE
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE parent (a int);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE child_1 (b int) inherits (parent);"
+);
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE child_2 (b int) inherits (parent);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE parent REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE child_1 REPLICA IDENTITY FULL;");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE child_2 REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE parent (a int);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE child_1 (b int) inherits (parent);"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE child_2 (b int) inherits (parent);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_on_parent ON parent(a)"
+);
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_on_child_1_a ON child_1(a)"
+);
+
+# create another index on the child on a column with higher cardinality
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX index_on_child_1_b ON child_1(b)"
+);
+
+# insert some initial data where cardinality of value_1 is high, and cardinality of value_2 is very low
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO parent SELECT i FROM generate_series(0,1000)i;");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO child_1 SELECT (i%500), 0 FROM generate_series(0,1000)i;");
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO child_2 SELECT (i%500), 0 FROM generate_series(0,1000)i;");
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE parent");
+
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_5 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+$node_subscriber->safe_psql('postgres',	"ANALYZE parent");
+$node_subscriber->safe_psql('postgres',	"ANALYZE child_1");
+$node_subscriber->safe_psql('postgres',	"ANALYZE child_2");
+
+# updates rows and moves between partitions
+$node_publisher->safe_psql('postgres',
+	"UPDATE parent SET a = 0 WHERE a = 10;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'index_on_parent';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates parent table'";
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM child_1 WHERE a = 250");
+
+# we check if the index is used or not. 2 rows from first command, another 2 from the second commsnd
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=4 from pg_stat_all_indexes where indexrelname = 'index_on_child_1_a';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates child_1 table'";
+
+
+# insert some more data where cardinality of column b is high on child_1
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO child_1 SELECT 0, i FROM generate_series(0,10000)i;",
+);
+$node_publisher->wait_for_catchup($appname);
+
+# ANALYZING parent wouldn't change the index used on child_1, still use index_on_child_1_a
+$node_subscriber->safe_psql('postgres',	"ANALYZE parent");
+$node_subscriber->safe_psql('postgres',	"ANALYZE child_1");
+
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM child_1 WHERE b = 41");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=1 from pg_stat_all_indexes where indexrelname = 'index_on_child_1_b';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates child_1 table'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE parent, child_1, child_2");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_5");
+$node_subscriber->safe_psql('postgres', "DROP TABLE parent, child_1, child_2");
+
+# Testcase end: SUBSCRIPTION CAN UPDATE THE INDEX IT USES AFTER ANALYZE - INHERITED TABLE
+# ====================================================================
+
+# ====================================================================
+# Testcase start: Some NULL values
+
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int);"
+);
+
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL;");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y int);"
+);
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full(x,y);"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full_5 CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync;
+
+
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full VALUES (1), (2), (3);");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 1;");
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = x + 1 WHERE x = 3;");
+
+# we check if the index is used or not
+$node_subscriber->poll_query_until(
+	'postgres', q{select idx_scan=2 from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates parent table'";
+
+# also, make sure results are expected
+$node_subscriber->poll_query_until(
+	'postgres', q{select sum(x)=8 from test_replica_id_full;}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates table with null values'";
+
+$node_subscriber->poll_query_until(
+	'postgres', q{select count(*)=3 from test_replica_id_full WHERE y IS NULL;}
+) or die "Timed out while waiting for check subscriber tap_sub_rep_full_5 updates table with null values - 2'";
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# cleanup sub
+$node_subscriber->safe_psql('postgres',
+	"DROP SUBSCRIPTION tap_sub_rep_full_5");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: Some NULL values
+# ====================================================================
+
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');
+
+done_testing();
-- 
2.34.1

Reply via email to