commit a5af30148e560a83bfb845c98ecae4951eb60c34
Author: Kaiting Chen <ktchen14@gmail.com>
Date:   Mon Jun 6 17:33:57 2022 -0400

    Add feature

diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 2de0ebacec..e2da2165be 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -378,7 +378,10 @@ static int	transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
 									   int16 *attnums, Oid *atttypids,
 									   Oid *opclasses);
 static Oid	transformFkeyCheckAttrs(Relation pkrel,
-									int numattrs, int16 *attnums,
+									int numattrs, int16 *attnums, Oid *pktypoid,
+									Oid *opclasses);
+static Oid	transformFkeyCheckIndex(Relation pkrel, const char *pk_indexname,
+									int numattrs, int16 *attnums, Oid *pktypoid,
 									Oid *opclasses);
 static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
 static CoercionPathType findFkeyCast(Oid targetTypeId, Oid sourceTypeId,
@@ -9151,9 +9154,20 @@ ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
 		numpks = transformColumnNameList(RelationGetRelid(pkrel),
 										 fkconstraint->pk_attrs,
 										 pkattnum, pktypoid);
-		/* Look for an index matching the column list */
-		indexOid = transformFkeyCheckAttrs(pkrel, numpks, pkattnum,
-										   opclasses);
+
+		/*
+		 * Look for an index matching the column list. If an explicit index name
+		 * is specified in the constraint then use that index.
+		 */
+		if (fkconstraint->pk_indexname == NULL)
+			indexOid = transformFkeyCheckAttrs(pkrel,
+											   numpks, pkattnum, pktypoid,
+											   opclasses);
+		else
+			indexOid = transformFkeyCheckIndex(pkrel,
+											   fkconstraint->pk_indexname,
+											   numpks, pkattnum, pktypoid,
+											   opclasses);
 	}
 
 	/*
@@ -11291,7 +11305,7 @@ transformFkeyGetPrimaryKey(Relation pkrel, Oid *indexOid,
  */
 static Oid
 transformFkeyCheckAttrs(Relation pkrel,
-						int numattrs, int16 *attnums,
+						int numattrs, int16 *attnums, Oid *pktypoid,
 						Oid *opclasses) /* output parameter */
 {
 	Oid			indexoid = InvalidOid;
@@ -11405,6 +11419,85 @@ transformFkeyCheckAttrs(Relation pkrel,
 			break;
 	}
 
+	/*
+	 * If the search doesn't find an index with the same key columns as the
+	 * referenced columns, then as a fallback, (re)test the primary key against
+	 * a subset of the referenced columns.
+	 */
+	if (!found && OidIsValid(indexoid = RelationGetPrimaryKeyIndex(pkrel)))
+	{
+		HeapTuple		indexTuple;
+		Form_pg_index	indexStruct;
+
+		indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+		if (!HeapTupleIsValid(indexTuple))
+			elog(ERROR, "cache lookup failed for index %u", indexoid);
+		indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+
+		/*
+		 * A primary key index must be unique, valid, expressionless, and
+		 * predicateless.
+		 */
+		Assert(indexStruct->indisunique);
+		Assert(indexStruct->indisvalid);
+		Assert(heap_attisnull(indexTuple, Anum_pg_index_indexprs, NULL));
+		Assert(heap_attisnull(indexTuple, Anum_pg_index_indpred, NULL));
+
+		if (indexStruct->indnkeyatts < numattrs && indexStruct->indimmediate)
+		{
+			Datum		indclassDatum;
+			bool		isnull;
+			oidvector  *indclass;
+
+			indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
+											Anum_pg_index_indclass, &isnull);
+			Assert(!isnull);
+			indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+			found = true;
+			for (j = 0; j < indexStruct->indnkeyatts; j++)
+			{
+				for (i = 0; i < numattrs; i++)
+				{
+					if (attnums[i] == indexStruct->indkey.values[j])
+						goto next_indkey;
+				}
+
+				found = false;
+				break;
+
+			next_indkey:
+				opclasses[i] = indclass->values[j];
+			}
+		}
+
+		if (found)
+		{
+			HeapTuple	classTuple;
+			Oid			amid;
+
+			classTuple = SearchSysCache1(RELOID, ObjectIdGetDatum(indexoid));
+			if (!HeapTupleIsValid(classTuple))
+				elog(ERROR, "cache lookup failed for relation %u", indexoid);
+			amid = ((Form_pg_class) GETSTRUCT(classTuple))->relam;
+			ReleaseSysCache(classTuple);
+
+			/*
+			* Use the default opclass for the column type with the same access
+			* method as the index for each referenced column that isn't an index
+			* column.
+			*/
+			for (i = 0; i < numattrs; i++)
+			{
+				if (opclasses[i] != InvalidOid)
+					continue;
+				opclasses[i] = ResolveOpClass(NIL, pktypoid[i], "btree", amid);
+			}
+		}
+
+		ReleaseSysCache(indexTuple);
+	}
+
 	if (!found)
 	{
 		if (found_deferrable)
@@ -11424,6 +11517,186 @@ transformFkeyCheckAttrs(Relation pkrel,
 	return indexoid;
 }
 
+/*
+ * transformFkeyCheckIndex -
+ *
+ *  Make sure that the specified index is usable with this foreign key
+ *  constraint. Return the OID of the index supporting the constraint, as well
+ *  as the opclasses associated with the referenced columns.
+ */
+static Oid
+transformFkeyCheckIndex(Relation pkrel, const char *pk_indexname,
+						int numattrs, int16 *attnums, Oid *pktypoid,
+						Oid *opclasses) /* output parameter */
+{
+	Oid				indexoid = InvalidOid,
+					amid;
+	HeapTuple		classTuple,
+					indexTuple;
+	Form_pg_class	classStruct;
+	Form_pg_index	indexStruct;
+	int				i,
+					j;
+	Datum			indclassDatum;
+	bool			isnull;
+	oidvector  		*indclass;
+
+	/*
+	 * Reject duplicate appearances of columns in the referenced-columns list.
+	 * Such a case is forbidden by the SQL standard, and even if we thought it
+	 * useful to allow it, there would be ambiguity about how to match the
+	 * list to unique indexes (in particular, it'd be unclear which index
+	 * opclass goes with which FK column).
+	 */
+	for (i = 0; i < numattrs; i++)
+	{
+		for (j = i + 1; j < numattrs; j++)
+		{
+			if (attnums[i] == attnums[j])
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_FOREIGN_KEY),
+						 errmsg("foreign key referenced-columns list must not contain duplicates")));
+		}
+	}
+
+	/* Look for the index in the same schema as the table */
+	classTuple = SearchSysCache2(RELNAMENSP, PointerGetDatum(pk_indexname),
+								 ObjectIdGetDatum(RelationGetNamespace(pkrel)));
+	if (!HeapTupleIsValid(classTuple))
+		ereport(ERROR,
+				errcode(ERRCODE_UNDEFINED_OBJECT),
+				errmsg("index \"%s\" for table \"%s\" does not exist",
+					   pk_indexname, RelationGetRelationName(pkrel)));
+	classStruct = (Form_pg_class) GETSTRUCT(classTuple);
+
+	if (classStruct->relkind != RELKIND_INDEX &&
+		classStruct->relkind != RELKIND_PARTITIONED_INDEX)
+		ereport(ERROR,
+				errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				errmsg("\"%s\" is not an index", pk_indexname));
+
+	indexoid = classStruct->oid;
+	amid = classStruct->relam;
+	ReleaseSysCache(classTuple);
+
+	indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid));
+	if (!HeapTupleIsValid(indexTuple))
+		elog(ERROR, "cache lookup failed for index %u", indexoid);
+	indexStruct = (Form_pg_index) GETSTRUCT(indexTuple);
+
+	/*
+	 * The index must belong to the primary key relation. It can't have more key
+	 * columns than are referenced. It must be unique and not a partial index;
+	 * forget it if there are any expressions, too. Invalid indexes are out as
+	 * well.
+	 */
+	if (indexStruct->indrelid != RelationGetRelid(pkrel))
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("index \"%s\" does not belong to table \"%s\"",
+					   pk_indexname, RelationGetRelationName(pkrel)));
+
+	/* TODO */
+	if (indexStruct->indnkeyatts > numattrs)
+		ereport(ERROR,
+				errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				errmsg("\"%s\" is not a usable index with given keys for referenced table \"%s\"",
+					   pk_indexname, RelationGetRelationName(pkrel)),
+				errdetail("Cannot create a foreign key constraint using an "
+						  "index unless the index's key columns are a subset "
+						  "of the referenced columns"));
+
+	if (!indexStruct->indisvalid)
+		ereport(ERROR,
+				errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				errmsg("index \"%s\" is not valid", pk_indexname));
+
+	if (!indexStruct->indisunique)
+		ereport(ERROR,
+				errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				errmsg("\"%s\" is not a unique index",
+					   pk_indexname),
+				errdetail("Cannot create a foreign key constraint using such an index."));
+
+	if (!heap_attisnull(indexTuple, Anum_pg_index_indexprs, NULL))
+		ereport(ERROR,
+				errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				errmsg("index \"%s\" contains expressions",
+					   pk_indexname),
+				errdetail("Cannot create a foreign key constraint using such an index."));
+
+	if (!heap_attisnull(indexTuple, Anum_pg_index_indpred, NULL))
+		ereport(ERROR,
+				errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				errmsg("\"%s\" is a partial index",
+					   pk_indexname),
+				errdetail("Cannot create a foreign key constraint using such an index."));
+
+	/*
+	 * Also, refuse to use a deferrable unique/primary key. This is per SQL
+	 * spec, and there would be a lot of interesting semantic problems if we
+	 * tried to allow it.
+	 */
+	if (!indexStruct->indimmediate)
+		ereport(ERROR,
+				errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				errmsg("\"%s\" is a deferrable index",
+					   pk_indexname),
+				errdetail("Cannot create a foreign key constraint using such an index."));
+
+	/* Must get indclass the hard way */
+	indclassDatum = SysCacheGetAttr(INDEXRELID, indexTuple,
+									Anum_pg_index_indclass, &isnull);
+	Assert(!isnull);
+	indclass = (oidvector *) DatumGetPointer(indclassDatum);
+
+	/*
+	 * The given attnum list may match the index columns in any order. Check for
+	 * a match, and extract the appropriate opclasses while we're at it.
+	 *
+	 * We know that attnums[] is duplicate-free per the test at the start of
+	 * this function, and we checked above that the number of index columns
+	 * is no more than the number of referenced columns, so if we find a match
+	 * for an attnums[] entry then we must have a unique match in some order.
+	 */
+	for (j = 0; j < indexStruct->indnkeyatts; j++)
+	{
+		for (i = 0; i < numattrs; i++)
+		{
+			if (attnums[i] == indexStruct->indkey.values[j])
+				goto next_indkey;
+		}
+
+		/* TODO */
+		ereport(ERROR,
+				errcode(ERRCODE_WRONG_OBJECT_TYPE),
+				errmsg("\"%s\" is not a usable index with given keys for referenced table \"%s\"",
+					   pk_indexname, RelationGetRelationName(pkrel)),
+				errdetail("Cannot create a foreign key constraint using an "
+						  "index unless the index's key columns are a subset "
+						  "of the referenced columns"));
+
+	next_indkey:
+		opclasses[i] = indclass->values[j];
+	}
+
+	/*
+	 * Use the default opclass for the column type with the same access method
+	 * as the index for each referenced column that isn't an index column.
+	 */
+	for (i = 0; i < numattrs; i++)
+	{
+		if (opclasses[i] != InvalidOid)
+			continue;
+
+		/* Currently no index AMs other than btree support unique indexes */
+		opclasses[i] = ResolveOpClass(NIL, pktypoid[i], "btree", amid);
+	}
+
+	ReleaseSysCache(indexTuple);
+	return indexoid;
+}
+
 /*
  * findFkeyCast -
  *
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 51d630fa89..31ca1d29c2 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -3601,6 +3601,7 @@ _copyConstraint(const Constraint *from)
 	COPY_NODE_FIELD(pktable);
 	COPY_NODE_FIELD(fk_attrs);
 	COPY_NODE_FIELD(pk_attrs);
+	COPY_STRING_FIELD(pk_indexname);
 	COPY_SCALAR_FIELD(fk_matchtype);
 	COPY_SCALAR_FIELD(fk_upd_action);
 	COPY_SCALAR_FIELD(fk_del_action);
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index e747e1667d..30131e1056 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -3089,6 +3089,7 @@ _equalConstraint(const Constraint *a, const Constraint *b)
 	COMPARE_NODE_FIELD(pktable);
 	COMPARE_NODE_FIELD(fk_attrs);
 	COMPARE_NODE_FIELD(pk_attrs);
+	COMPARE_STRING_FIELD(pk_indexname);
 	COMPARE_SCALAR_FIELD(fk_matchtype);
 	COMPARE_SCALAR_FIELD(fk_upd_action);
 	COMPARE_SCALAR_FIELD(fk_del_action);
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index ce12915592..59166f0ffa 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -3927,6 +3927,7 @@ _outConstraint(StringInfo str, const Constraint *node)
 			WRITE_NODE_FIELD(pktable);
 			WRITE_NODE_FIELD(fk_attrs);
 			WRITE_NODE_FIELD(pk_attrs);
+			WRITE_STRING_FIELD(pk_indexname);
 			WRITE_CHAR_FIELD(fk_matchtype);
 			WRITE_CHAR_FIELD(fk_upd_action);
 			WRITE_CHAR_FIELD(fk_del_action);
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 969c9c158f..0222d6f20e 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -595,7 +595,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 %type <node>	TableConstraint TableLikeClause
 %type <ival>	TableLikeOptionList TableLikeOption
-%type <str>		column_compression opt_column_compression
+%type <str>		column_compression opt_column_compression key_index
 %type <list>	ColQualList
 %type <node>	ColConstraint ColConstraintElem ConstraintAttr
 %type <ival>	key_match
@@ -3997,7 +3997,7 @@ ColConstraintElem:
 
 					$$ = (Node *) n;
 				}
-			| REFERENCES qualified_name opt_column_list key_match key_actions
+			| REFERENCES qualified_name opt_column_list key_index key_match key_actions
 				{
 					Constraint *n = makeNode(Constraint);
 
@@ -4006,10 +4006,11 @@ ColConstraintElem:
 					n->pktable = $2;
 					n->fk_attrs = NIL;
 					n->pk_attrs = $3;
-					n->fk_matchtype = $4;
-					n->fk_upd_action = ($5)->updateAction->action;
-					n->fk_del_action = ($5)->deleteAction->action;
-					n->fk_del_set_cols = ($5)->deleteAction->cols;
+					n->pk_indexname = $4;
+					n->fk_matchtype = $5;
+					n->fk_upd_action = ($6)->updateAction->action;
+					n->fk_del_action = ($6)->deleteAction->action;
+					n->fk_del_set_cols = ($6)->deleteAction->cols;
 					n->skip_validation = false;
 					n->initially_valid = true;
 					$$ = (Node *) n;
@@ -4229,7 +4230,7 @@ ConstraintElem:
 					$$ = (Node *) n;
 				}
 			| FOREIGN KEY '(' columnList ')' REFERENCES qualified_name
-				opt_column_list key_match key_actions ConstraintAttributeSpec
+				opt_column_list key_index key_match key_actions ConstraintAttributeSpec
 				{
 					Constraint *n = makeNode(Constraint);
 
@@ -4238,11 +4239,12 @@ ConstraintElem:
 					n->pktable = $7;
 					n->fk_attrs = $4;
 					n->pk_attrs = $8;
-					n->fk_matchtype = $9;
-					n->fk_upd_action = ($10)->updateAction->action;
-					n->fk_del_action = ($10)->deleteAction->action;
-					n->fk_del_set_cols = ($10)->deleteAction->cols;
-					processCASbits($11, @11, "FOREIGN KEY",
+					n->pk_indexname	= $9;
+					n->fk_matchtype = $10;
+					n->fk_upd_action = ($11)->updateAction->action;
+					n->fk_del_action = ($11)->deleteAction->action;
+					n->fk_del_set_cols = ($11)->deleteAction->cols;
+					processCASbits($12, @12, "FOREIGN KEY",
 								   &n->deferrable, &n->initdeferred,
 								   &n->skip_validation, NULL,
 								   yyscanner);
@@ -4275,6 +4277,16 @@ opt_c_include:	INCLUDE '(' columnList ')'			{ $$ = $3; }
 			 |		/* EMPTY */						{ $$ = NIL; }
 		;
 
+key_index:	USING INDEX name
+			{
+				$$ = $3;
+			}
+		| /* EMPTY */
+			{
+				$$ = NULL;
+			}
+		;
+
 key_match:  MATCH FULL
 			{
 				$$ = FKCONSTR_MATCH_FULL;
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index c3937a60fd..12abea444c 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -2231,6 +2231,9 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
 		case CONSTRAINT_FOREIGN:
 			{
 				Datum		val;
+				Oid			indexId;
+				HeapTuple	indtup;
+				int			nKeys;
 				bool		isnull;
 				const char *string;
 
@@ -2244,7 +2247,7 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
 					elog(ERROR, "null conkey for constraint %u",
 						 constraintId);
 
-				decompile_column_index_array(val, conForm->conrelid, &buf);
+				nKeys = decompile_column_index_array(val, conForm->conrelid, &buf);
 
 				/* add foreign relation name */
 				appendStringInfo(&buf, ") REFERENCES %s(",
@@ -2258,10 +2261,37 @@ pg_get_constraintdef_worker(Oid constraintId, bool fullCommand,
 					elog(ERROR, "null confkey for constraint %u",
 						 constraintId);
 
-				decompile_column_index_array(val, conForm->confrelid, &buf);
+				nKeys = decompile_column_index_array(val, conForm->confrelid, &buf);
 
 				appendStringInfoChar(&buf, ')');
 
+				indexId = conForm->conindid;
+
+				/* Add USING INDEX clause */
+				indtup = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexId));
+				if (!HeapTupleIsValid(indtup))
+					elog(ERROR, "cache lookup failed for index %u", indexId);
+
+				val = SysCacheGetAttr(INDEXRELID, indtup,
+									  Anum_pg_index_indisprimary, &isnull);
+				if (isnull)
+					elog(ERROR, "null indisprimary for index %u", indexId);
+
+				if (!DatumGetBool(val))
+				{
+					val = SysCacheGetAttr(INDEXRELID, indtup,
+										  Anum_pg_index_indnkeyatts, &isnull);
+					if (isnull)
+						elog(ERROR, "null indnkeyatts for index %u", indexId);
+
+					if (DatumGetInt32(val) < nKeys)
+						appendStringInfo(&buf, " USING INDEX %s",
+										 generate_relation_name(conForm->conindid,
+																NIL));
+				}
+
+				ReleaseSysCache(indtup);
+
 				/* Add match type */
 				switch (conForm->confmatchtype)
 				{
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 73f635b455..0b18c4b8de 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -2646,6 +2646,7 @@ typedef struct Constraint
 	RangeVar   *pktable;		/* Primary key table */
 	List	   *fk_attrs;		/* Attributes of foreign key */
 	List	   *pk_attrs;		/* Corresponding attrs in PK table */
+	char	   *pk_indexname;	/* Primary key index name, or NULL if unspecified */
 	char		fk_matchtype;	/* FULL, PARTIAL, SIMPLE */
 	char		fk_upd_action;	/* ON UPDATE action */
 	char		fk_del_action;	/* ON DELETE action */
