From 473f9086b81260ba1751d1343d7fe0ef8615e943 Mon Sep 17 00:00:00 2001
From: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Date: Mon, 19 Jun 2023 08:28:00 +0000
Subject: [PATCH] Allow to use Hash index on subscriber

---
 src/backend/executor/execReplication.c     | 52 +++++++++++--
 src/backend/replication/logical/relation.c |  9 ++-
 src/backend/utils/cache/lsyscache.c        | 22 ++++++
 src/include/utils/lsyscache.h              |  1 +
 src/test/subscription/meson.build          |  1 +
 src/test/subscription/t/034_hash.pl        | 91 ++++++++++++++++++++++
 6 files changed, 166 insertions(+), 10 deletions(-)
 create mode 100644 src/test/subscription/t/034_hash.pl

diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 9dd7168461..0200017ae7 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -19,6 +19,8 @@
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/pg_am_d.h"
+#include "commands/defrem.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
@@ -41,15 +43,50 @@
 static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
 						 TypeCacheEntry **eq);
 
+/*
+ * Return the appropriate strategy number which corresponds to the equality
+ * comparisons.
+ *
+ * TODO: support other indexes: GiST, GIN, SP-GiST, BRIN
+ */
+static int
+get_equal_strategy_number(Oid opclass)
+{
+	Oid am_method = get_opclass_method(opclass);
+	int ret;
+
+	switch (am_method)
+	{
+		case BTREE_AM_OID:
+			ret = BTEqualStrategyNumber;
+			break;
+		case HASH_AM_OID:
+			ret = HTEqualStrategyNumber;
+			break;
+		case GIST_AM_OID:
+		case GIN_AM_OID:
+		case SPGIST_AM_OID:
+		case BRIN_AM_OID:
+			/* TODO */
+		default:
+			/* XXX: Do we have to support extended indexes? */
+			ret = InvalidStrategy;
+			break;
+	}
+
+	return ret;
+}
+
+
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
  * is setup to match 'rel' (*NOT* idxrel!).
  *
  * Returns how many columns to use for the index scan.
  *
- * This is not generic routine, it expects the idxrel to be a btree, non-partial
- * and have at least one column reference (i.e. cannot consist of only
- * expressions).
+ * This is not generic routine, it expects the idxrel to be a btree or a hash,
+ * non-partial and have at least one column reference (i.e. cannot consist of
+ * only expressions).
  *
  * By definition, replication identity of a rel meets all limitations associated
  * with that. Note that any other index could also meet these limitations.
@@ -77,6 +114,7 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		Oid			opfamily;
 		RegProcedure regop;
 		int			table_attno = indkey->values[index_attoff];
+		int			strategy_number;
 
 		if (!AttributeNumberIsValid(table_attno))
 		{
@@ -93,20 +131,22 @@ build_replindex_scan_key(ScanKey skey, Relation rel, Relation idxrel,
 		 */
 		optype = get_opclass_input_type(opclass->values[index_attoff]);
 		opfamily = get_opclass_family(opclass->values[index_attoff]);
+		strategy_number = get_equal_strategy_number(opclass->values[index_attoff]);
 
 		operator = get_opfamily_member(opfamily, optype,
 									   optype,
-									   BTEqualStrategyNumber);
+									   strategy_number);
+
 		if (!OidIsValid(operator))
 			elog(ERROR, "missing operator %d(%u,%u) in opfamily %u",
-				 BTEqualStrategyNumber, optype, optype, opfamily);
+				 strategy_number, optype, optype, opfamily);
 
 		regop = get_opcode(operator);
 
 		/* Initialize the scankey. */
 		ScanKeyInit(&skey[skey_attoff],
 					index_attoff + 1,
-					BTEqualStrategyNumber,
+					strategy_number,
 					regop,
 					searchslot->tts_values[table_attno - 1]);
 
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index 57ad22b48a..38c8cebfdf 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -779,8 +779,8 @@ RemoteRelContainsLeftMostColumnOnIdx(IndexInfo *indexInfo, AttrMap *attrmap)
 
 /*
  * Returns the oid of an index that can be used by the apply worker to scan
- * the relation. The index must be btree, non-partial, and have at least
- * one column reference (i.e. cannot consist of only expressions). These
+ * the relation. The index must be btree or hash, non-partial, and have at
+ * least one column reference (i.e. cannot consist of only expressions). These
  * limitations help to keep the index scan similar to PK/RI index scans.
  *
  * Note that the limitations of index scans for replica identity full only
@@ -841,11 +841,12 @@ FindUsableIndexForReplicaIdentityFull(Relation localrel, AttrMap *attrmap)
 bool
 IsIndexUsableForReplicaIdentityFull(IndexInfo *indexInfo)
 {
-	bool		is_btree = (indexInfo->ii_Am == BTREE_AM_OID);
+	bool		is_suitable_type = (indexInfo->ii_Am == BTREE_AM_OID) ||
+								   (indexInfo->ii_Am == HASH_AM_OID);
 	bool		is_partial = (indexInfo->ii_Predicate != NIL);
 	bool		is_only_on_expression = IsIndexOnlyOnExpression(indexInfo);
 
-	return is_btree && !is_partial && !is_only_on_expression;
+	return is_suitable_type && !is_partial && !is_only_on_expression;
 }
 
 /*
diff --git a/src/backend/utils/cache/lsyscache.c b/src/backend/utils/cache/lsyscache.c
index 60978f9415..9a44e7ad54 100644
--- a/src/backend/utils/cache/lsyscache.c
+++ b/src/backend/utils/cache/lsyscache.c
@@ -1255,6 +1255,28 @@ get_opclass_opfamily_and_input_type(Oid opclass, Oid *opfamily, Oid *opcintype)
 	return true;
 }
 
+/*
+ * get_opclass_method
+ *
+ *		Returns the OID of the index access method operator family is for.
+ */
+Oid
+get_opclass_method(Oid opclass)
+{
+	HeapTuple	tp;
+	Form_pg_opclass cla_tup;
+	Oid			result;
+
+	tp = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclass));
+	if (!HeapTupleIsValid(tp))
+		elog(ERROR, "cache lookup failed for opclass %u", opclass);
+	cla_tup = (Form_pg_opclass) GETSTRUCT(tp);
+
+	result = cla_tup->opcmethod;
+	ReleaseSysCache(tp);
+	return result;
+}
+
 /*				---------- OPERATOR CACHE ----------					 */
 
 /*
diff --git a/src/include/utils/lsyscache.h b/src/include/utils/lsyscache.h
index 4f5418b972..ea8fbe42cd 100644
--- a/src/include/utils/lsyscache.h
+++ b/src/include/utils/lsyscache.h
@@ -106,6 +106,7 @@ extern Oid	get_opclass_family(Oid opclass);
 extern Oid	get_opclass_input_type(Oid opclass);
 extern bool get_opclass_opfamily_and_input_type(Oid opclass,
 												Oid *opfamily, Oid *opcintype);
+extern Oid get_opclass_method(Oid opclass);
 extern RegProcedure get_opcode(Oid opno);
 extern char *get_opname(Oid opno);
 extern Oid	get_op_rettype(Oid opno);
diff --git a/src/test/subscription/meson.build b/src/test/subscription/meson.build
index bd673a9d68..5fa5f0a790 100644
--- a/src/test/subscription/meson.build
+++ b/src/test/subscription/meson.build
@@ -40,6 +40,7 @@ tests += {
       't/031_column_list.pl',
       't/032_subscribe_use_index.pl',
       't/033_run_as_table_owner.pl',
+      't/034_hash.pl',
       't/100_bugs.pl',
     ],
   },
diff --git a/src/test/subscription/t/034_hash.pl b/src/test/subscription/t/034_hash.pl
new file mode 100644
index 0000000000..e7babcafa7
--- /dev/null
+++ b/src/test/subscription/t/034_hash.pl
@@ -0,0 +1,91 @@
+# Copyright (c) 2022-2023, PostgreSQL Global Development Group
+
+# Test logical replication behavior with subscriber using available index
+use strict;
+use warnings;
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# create publisher node
+my $node_publisher = PostgreSQL::Test::Cluster->new('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# create subscriber node
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+my $appname = 'tap_sub';
+my $result = '';
+
+# =============================================================================
+# Testcase start: Subscription can use hash index
+#
+
+# create tables pub and sub
+$node_publisher->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_publisher->safe_psql('postgres',
+	"ALTER TABLE test_replica_id_full REPLICA IDENTITY FULL");
+$node_subscriber->safe_psql('postgres',
+	"CREATE TABLE test_replica_id_full (x int, y text)");
+$node_subscriber->safe_psql('postgres',
+	"CREATE INDEX test_replica_id_full_idx ON test_replica_id_full USING HASH (x)");
+
+# insert some initial data within the range 0-9 for y
+$node_publisher->safe_psql('postgres',
+	"INSERT INTO test_replica_id_full SELECT i, (i%10)::text FROM generate_series(0,10) i"
+);
+
+# create pub/sub
+$node_publisher->safe_psql('postgres',
+	"CREATE PUBLICATION tap_pub_rep_full FOR TABLE test_replica_id_full");
+$node_subscriber->safe_psql('postgres',
+	"CREATE SUBSCRIPTION tap_sub_rep_full CONNECTION '$publisher_connstr application_name=$appname' PUBLICATION tap_pub_rep_full"
+);
+
+# wait for initial table synchronization to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher, $appname);
+
+# delete 2 rows
+$node_publisher->safe_psql('postgres',
+	"DELETE FROM test_replica_id_full WHERE x IN (5, 6)");
+
+# update 2 rows
+$node_publisher->safe_psql('postgres',
+	"UPDATE test_replica_id_full SET x = 100, y = '200' WHERE x IN (1, 2)");
+
+# wait until the index is used on the subscriber
+# XXX: the test will be suspended here
+$node_publisher->wait_for_catchup($appname);
+$node_subscriber->poll_query_until('postgres',
+	q{select (idx_scan = 4) from pg_stat_all_indexes where indexrelname = 'test_replica_id_full_idx';}
+  )
+  or die
+  "Timed out while waiting for check subscriber tap_sub_rep_full updates 4 rows via index";
+
+# make sure that the subscriber has the correct data after the UPDATE
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full WHERE (x = 100 and y = '200')"
+);
+is($result, qq(2),
+	'ensure subscriber has the correct data at the end of the test');
+
+# make sure that the subscriber has the correct data after the first DELETE
+$result = $node_subscriber->safe_psql('postgres',
+	"select count(*) from test_replica_id_full where x in (5, 6)");
+is($result, qq(0),
+	'ensure subscriber has the correct data at the end of the test');
+
+# cleanup pub
+$node_publisher->safe_psql('postgres', "DROP PUBLICATION tap_pub_rep_full");
+$node_publisher->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+# cleanup sub
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub_rep_full");
+$node_subscriber->safe_psql('postgres', "DROP TABLE test_replica_id_full");
+
+# Testcase end: Subscription can use index
+# =============================================================================
-- 
2.27.0

