On Mon, 2022-10-31 at 03:20 +0000, shiy.f...@fujitsu.com wrote:
> On Sun, Oct 30, 2022 9:39 PM Tom Lane <t...@sss.pgh.pa.us> wrote:
> > 
> > What I'm wondering about is whether we can refactor this code
> > to avoid so many usually-useless catalog lookups.  Pulling the
> > namespace name, in particular, is expensive and we generally
> > are not going to need the result.  In the child-rel case it'd
> > be much better to pass the opened relation and let the error-check
> > subroutine work from that.  Maybe we should just do it like that
> > at the start of the recursion, too?  Or pass the relid and let
> > the subroutine look up the names only in the error case.
> > 
> > A completely different line of thought is that this doesn't seem
> > like a terribly bulletproof fix, since children could get added to
> > a partitioned table after we look.  Maybe it'd be better to check
> > the relkind at the last moment before we do something that depends
> > on a child table being a relation.
> > 
> 
> I agree. So maybe we can add this check in
> apply_handle_tuple_routing().
> 
> diff --git a/src/backend/replication/logical/worker.c
> b/src/backend/replication/logical/worker.c
> index 5250ae7f54..e941b68e4b 100644
> --- a/src/backend/replication/logical/worker.c
> +++ b/src/backend/replication/logical/worker.c
> @@ -2176,6 +2176,10 @@ apply_handle_tuple_routing(ApplyExecutionData
> *edata,
>         Assert(partrelinfo != NULL);
>         partrel = partrelinfo->ri_RelationDesc;
> 
> +       /* Check for supported relkind. */
> +       CheckSubscriptionRelkind(partrel->rd_rel->relkind,
> +                                                        relmapentry-
> >remoterel.nspname, relmapentry->remoterel.relname);
> +
>         /*
>          * To perform any of the operations below, the tuple must
> match the
>          * partition's rowtype. Convert if needed or just copy, using
> a dedicated
> 
> 
> Regards,
> Shi yu

I have verified that the current patch handles the attaching of new
partitions to the target partitioned table by throwing an error on
attempt to insert into a foreign table inside the logical replication
worker. I have refactored the code to minimize cache lookups, but I am
yet to write the tests for this. See the attached patch for the
refactored version.


From 56086185324e12e9db6add40f84bc7e60867f1d6 Mon Sep 17 00:00:00 2001
From: Ilya Gladyshev <ilya.v.gladys...@gmail.com>
Date: Tue, 1 Nov 2022 01:42:44 +0400
Subject: [PATCH v2] check relkind of subscription target recursively

---
 src/backend/commands/subscriptioncmds.c    | 18 ++++----
 src/backend/executor/execReplication.c     | 51 ++++++++++++++++++----
 src/backend/replication/logical/relation.c |  6 +--
 src/include/executor/executor.h            |  4 +-
 4 files changed, 57 insertions(+), 22 deletions(-)

diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f0cec2ad5e..2ae89f9e03 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -700,12 +700,13 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
 			{
 				RangeVar   *rv = (RangeVar *) lfirst(lc);
 				Oid			relid;
+				Relation rel;
 
 				relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
-				/* Check for supported relkind. */
-				CheckSubscriptionRelkind(get_rel_relkind(relid),
-										 rv->schemaname, rv->relname);
+				rel = RelationIdGetRelation(relid);
+				/* Check for supported relkind recursively. */
+				CheckSubscriptionRelation(rel, rv->schemaname, rv->relname);
+				RelationClose(rel);
 
 				AddSubscriptionRelState(subid, relid, table_state,
 										InvalidXLogRecPtr);
@@ -861,12 +862,13 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 		{
 			RangeVar   *rv = (RangeVar *) lfirst(lc);
 			Oid			relid;
+			Relation rel;
 
 			relid = RangeVarGetRelid(rv, AccessShareLock, false);
-
-			/* Check for supported relkind. */
-			CheckSubscriptionRelkind(get_rel_relkind(relid),
-									 rv->schemaname, rv->relname);
+			rel = RelationIdGetRelation(relid);
+			/* Check for supported relkind recursively. */
+			CheckSubscriptionRelation(rel, rv->schemaname, rv->relname);
+			RelationClose(rel);
 
 			pubrel_local_oids[off++] = relid;
 
diff --git a/src/backend/executor/execReplication.c b/src/backend/executor/execReplication.c
index 6014f2e248..9da0b6b3a1 100644
--- a/src/backend/executor/execReplication.c
+++ b/src/backend/executor/execReplication.c
@@ -16,9 +16,11 @@
 
 #include "access/genam.h"
 #include "access/relscan.h"
+#include "access/table.h"
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "catalog/pg_inherits.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
@@ -645,20 +647,51 @@ CheckCmdReplicaIdentity(Relation rel, CmdType cmd)
 				 errhint("To enable deleting from the table, set REPLICA IDENTITY using ALTER TABLE.")));
 }
 
-
-/*
- * Check if we support writing into specific relkind.
- *
- * The nspname and relname are only needed for error reporting.
- */
-void
-CheckSubscriptionRelkind(char relkind, const char *nspname,
-						 const char *relname)
+static void
+CheckSubscriptionRelkind(Relation rel, const char *nspname, const char *relname)
 {
+	char relkind = rel->rd_rel->relkind;
+
 	if (relkind != RELKIND_RELATION && relkind != RELKIND_PARTITIONED_TABLE)
+	{
+		if (relname == NULL)
+			relname = RelationGetRelationName(rel);
+		if (nspname == NULL)
+			nspname = get_namespace_name(RelationGetNamespace(rel));
 		ereport(ERROR,
 				(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 				 errmsg("cannot use relation \"%s.%s\" as logical replication target",
 						nspname, relname),
 				 errdetail_relkind_not_supported(relkind)));
+	}
+}
+
+/*
+ * Recursively check if we support writing into specific relkind.
+ *
+ * The nspname and relname are only needed for error reporting.
+ */
+void
+CheckSubscriptionRelation(Relation rel, const char *nspname,
+						  const char *relname)
+{
+	CheckSubscriptionRelkind(rel, nspname, relname);
+
+	if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+	{
+		List *inheritors;
+		ListCell *lc;
+
+		inheritors = find_all_inheritors(RelationGetRelid(rel),
+										 AccessShareLock,
+										 NULL);
+		foreach (lc, inheritors)
+		{
+			Oid child_oid = lfirst_oid(lc);
+			Relation child_rel = RelationIdGetRelation(child_oid);
+
+			CheckSubscriptionRelkind(child_rel, NULL, NULL);
+			RelationClose(child_rel);
+		}
+	}
 }
diff --git a/src/backend/replication/logical/relation.c b/src/backend/replication/logical/relation.c
index e989047681..c1ef799084 100644
--- a/src/backend/replication/logical/relation.c
+++ b/src/backend/replication/logical/relation.c
@@ -392,9 +392,9 @@ logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
 		entry->localrel = table_open(relid, NoLock);
 		entry->localreloid = relid;
 
-		/* Check for supported relkind. */
-		CheckSubscriptionRelkind(entry->localrel->rd_rel->relkind,
-								 remoterel->nspname, remoterel->relname);
+		/* Check for supported relkind recursively. */
+		CheckSubscriptionRelation(entry->localrel,
+								  remoterel->nspname, remoterel->relname);
 
 		/*
 		 * Build the mapping of local attribute numbers to remote attribute
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index ed95ed1176..93de92864f 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -646,8 +646,8 @@ extern void ExecSimpleRelationDelete(ResultRelInfo *resultRelInfo,
 									 TupleTableSlot *searchslot);
 extern void CheckCmdReplicaIdentity(Relation rel, CmdType cmd);
 
-extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
-									 const char *relname);
+extern void CheckSubscriptionRelation(Relation rel, const char *nspname,
+									  const char *relname);
 
 /*
  * prototypes from functions in nodeModifyTable.c
-- 
2.30.2

Reply via email to