From 9b102ac120313b4443e15a7d3d46a59a8b263ff3 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 19 Jun 2023 08:28:00 +0000
Subject: [PATCH v2] Allow to use Hash index on subscriber

---
 src/backend/executor/execReplication.c     | 69 ++++++++++++++--
 src/backend/replication/logical/relation.c | 27 +++++--
 src/backend/utils/cache/lsyscache.c        | 22 +++++
 src/include/executor/executor.h            |  3 +
 src/include/utils/lsyscache.h              |  1 +
 src/test/subscription/meson.build          |  1 +
 src/test/subscription/t/034_hash.pl        | 93 ++++++++++++++++++++++
 7 files changed, 204 insertions(+), 12 deletions(-)
 create mode 100644 src/test/subscription/t/034_hash.pl

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 9dd7168461..023585b619 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -19,6 +19,8 @@
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/pg_am_d.h"
+#include "commands/defrem.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
@@ -41,15 +43,67 @@
 static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
 						 TypeCacheEntry **eq);
 
+/*
+ * Return the strategy number which corresponds to the equality operator for
+ * given index access method.
+ *
+ * TODO: support other indexes: GiST, SP-GiST, other user-defined idexes.
+ */
+int
+get_equal_strategy_number_for_am(Oid am)
+{
+	int ret;
+
+	switch (am)
+	{
+		case BTREE_AM_OID:
+			ret = BTEqualStrategyNumber;
+			break;
+		case HASH_AM_OID:
+			ret = HTEqualStrategyNumber;
+			break;
+		case GIST_AM_OID:
+		case SPGIST_AM_OID:
+			/* TODO */
+		case GIN_AM_OID:
+		case BRIN_AM_OID:
+			/*
+			 * XXX: GIN and BRIN do not support for the moment because they do not
+			 * implement amgettuple API.
+			 */
+		default:
+			/* XXX: Do we have to support extended indexes? */
+			ret = InvalidStrategy;
+			break;
+	}
+
+	return ret;
+}
+
+/*
+ * Return the appropriate strategy number which corresponds to the equality
+ * operator.
+ *
+ * TODO: support other indexes: GiST, SP-GiST, other user-defined idexes.
+ */
+int
+get_equal_strategy_number(Oid opclass)
+{
+	Oid am = get_opclass_method(opclass);
+
+	return get_equal_strategy_number_for_am(am);
+}
+
+
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
  * is setup to match 'rel' (*NOT* idxrel!).
  *
  * Returns how many columns to use for the index scan.
  *
- * This is not generic routine, it expects the idxrel to be a btree, non-partial
- * and have at least one column reference (i.e. cannot consist of only
- * expressions).
+ * This is not generic routine, it expects the idxrel to be a btree or a hash,
+ * non-partial and have at least one column reference (i.e. cannot consist of
+ * only expressions).
  *
  * By definition, replication identity of a rel meets all limitations associated
  * with that. Note that any other index could also meet these limitations.
@@ -77,6 +131,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		Oid			opfamily;
 		RegProcedure regop;
 		int			table_attno = indkey->values[index_attoff];
+		int			eq_strategy;
 
 		if (!AttributeNumberIsValid(table_attno))
 		{
@@ -93,20 +148,22 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		 */
 		optype = get_opclass_input_type(opclass->values[index_attoff]);
 		opfamily = get_opclass_family(opclass->values[index_attoff]);
+		eq_strategy = get_equal_strategy_number(opclass->values[index_attoff]);
 
 		operator = get_opfamily_member(opfamily, optype,
 									   optype,
-									   BTEqualStrategyNumber);
+									   eq_strategy);
+
 		if (!OidIsValid(operator))
 			elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
-				 BTEqualStrategyNumber, optype, optype, opfamily);
+				 eq_strategy, optype, optype, opfamily);
 
 		regop = get_opcode(operator);
 
 		/* Initialize the scankey. */
 		ScanKeyInit(&skey[skey_attoff],
 					index_attoff + 1,
-					BTEqualStrategyNumber,
+					eq_strategy,
 					regop,
 					searchslot->tts_values[table_attno - 1]);
 
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 57ad22b48a..5f5da4c9d8 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -17,6 +17,7 @@
 
 #include "postgres.h"
 
+#include "access/amapi.h"
 #include "access/genam.h"
 #include "access/table.h"
 #include "catalog/namespace.h"
@@ -779,8 +780,8 @@ RemoteRelContainsLeftMostColumnOnIdx(IndexInfo *indexInfo, AttrMap *attrmap)
 
 /*
  * Returns the oid of an index that can be used by the apply worker to scan
- * the relation. The index must be btree, non-partial, and have at least
- * one column reference (i.e. cannot consist of only expressions). These
+ * the relation. The index must be btree or hash, non-partial, and have at
+ * least one column reference (i.e. cannot consist of only expressions). These
  * limitations help to keep the index scan similar to PK/RI index scans.
  *
  * Note that the limitations of index scans for replica identity full only
@@ -841,11 +842,25 @@ FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap)
 bool
 IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo)
 {
-	bool		is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
-	bool		is_partial = (indexInfo->ii_Predicate != NIL);
-	bool		is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
+	IndexAmRoutine *routine;
+	bool			is_suitable_type;
+	bool			is_partial;
+	bool			is_only_on_expression;
 
-	return is_btree && !is_partial && !is_only_on_expression;
+	/*
+	 * XXX: Support only indexes which implement amgettuple API. This is
+	 * because RelationFindReplTupleByIndex() assumes to be able to fetch
+	 * tuples one by one by the API.
+	 */
+	routine = GetIndexAmRoutineByAmId(indexInfo->ii_Am, false);
+	is_suitable_type = ((routine->amgettuple != NULL) &&
+						(get_equal_strategy_number_for_am(indexInfo->ii_Am)
+														!= InvalidStrategy));
+
+	is_partial = (indexInfo->ii_Predicate != NIL);
+	is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
+ 
+ 	return is_suitable_type && !is_partial && !is_only_on_expression;
 }
 
 /*
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index 60978f9415..05813d98ae 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -1255,6 +1255,28 @@ get_opclass_opfamily_and_input_type(Oid opclass, Oid *opfamily, Oid *opcintype)
 	return true;
 }
 
+/*
+ * get_opclass_method
+ *
+ *		Returns the OID of the index access method operator class is for.
+ */
+Oid
+get_opclass_method(Oid opclass)
+{
+	HeapTuple	tp;
+	Form_pg_opclass cla_tup;
+	Oid			result;
+
+	tp = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for opclass %u", opclass);
+	cla_tup = (Form_pg_opclass) GETSTRUCT(tp);
+
+	result = cla_tup->opcmethod;
+	ReleaseSysCache(tp);
+	return result;
+}
+
 /*				---------- OPERATOR CACHE ----------					 */
 
 /*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index ac02247947..3a23510478 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -666,6 +666,9 @@ extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
 extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
 									 const char *relname);
 
+extern int get_equal_strategy_number(Oid opclass);
+extern int get_equal_strategy_number_for_am(Oid am);
+
 /*
  * prototypes from functions in nodeModifyTable.c
  */
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 4f5418b972..ea8fbe42cd 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -106,6 +106,7 @@ extern Oid	get_opclass_family(Oid opclass);
 extern Oid	get_opclass_input_type(Oid opclass);
 extern bool get_opclass_opfamily_and_input_type(Oid opclass,
 												Oid *opfamily, Oid *opcintype);
+extern Oid get_opclass_method(Oid opclass);
 extern RegProcedure get_opcode(Oid opno);
 extern char *get_opname(Oid opno);
 extern Oid	get_op_rettype(Oid opno);
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index bd673a9d68..5fa5f0a790 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/031_column_list.pl',
       't/032_subscribe_use_index.pl',
       't/033_run_as_table_owner.pl',
+      't/034_hash.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/034_hash.pl b/src/test/subscription/t/034_hash.pl
new file mode 100644
index 0000000000..3aead6958b
--- /dev/null
+++ b/src/test/subscription/t/034_hash.pl
@@ -0,0 +1,93 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+# Test logical replication behavior with subscriber using available index
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname = 'tap_sub';
+my $result = '';
+
+# =============================================================================
+# Testcase start: Subscription can use hash index
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full USING HASH (x)");
+
+# insert some initial data
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i, (i%10)::text FROM generate_series(0,10) i"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# delete 2 rows
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x IN (5, 6)");
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = 100, y = '200' WHERE x IN (1, 2)");
+
+# wait until the index is used on the subscriber
+# XXX: the test will be suspended here
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->poll_query_until('postgres',
+	q{select (idx_scan = 4) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+  )
+  or die
+  "Timed out while waiting for check subscriber tap_sub_rep_full deletes 2 rows and updates 2 rows via index";
+
+# make sure that the subscriber has the correct data after the UPDATE
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full WHERE (x = 100 and y = '200')"
+);
+is($result, qq(2),
+	'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data after the first DELETE
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x in (5, 6)");
+is($result, qq(0),
+	'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: Subscription can use index
+# =============================================================================
+
+done_testing();
-- 
2.27.0

