On Tue, 2022-01-18 at 17:53 +0800, Julien Rouhaud wrote:
> > Review on patch 0001 would be helpful. It makes decoding just
> > another
> > method of rmgr, which makes a lot of sense to me from a code
> > organization standpoint regardless of the other patches. Is there
> > any
> > reason not to do that?
> 
> I think that it's a clean and sensible approach, so +1 for me.

Thank you, committed 0001. Other patches not committed yet.

> There's a bit of 0003 present in 002:

I just squashed 0002 and 0003 together. Not large enough to keep
separate.

> A few random comments on 0003:
> 
>  #define RM_MAX_ID              (RM_NEXT_ID - 1)
> +#define RM_CUSTOM_MIN_ID       128
> +#define RM_CUSTOM_MAX_ID       UINT8_MAX
> 
> It would be a good idea to add a StaticAssertStmt here to make sure
> that
> there's no overlap in the ranges.

Done.

> +
> +/*
> + * RmgrId to use for extensions that require an RmgrId, but are
> still in
> + * development and have not reserved their own unique RmgrId yet.
> See:
> + * https://wiki.postgresql.org/wiki/ExtensibleRmgr
> + */
> +#define RM_EXPERIMENTAL_ID     128
> 
> I'm a bit dubious about the whole "register your ID in the Wiki"
> approach.  It
> might not be a problem since there probably won't be hundreds of
> users, and I
> don't have any better suggestion since it has to be consistent across
> nodes.

Agree, but I don't see a better approach, either.

I do some sanity checking, which should catch collisions when they
happen.

> +   elog(LOG, "registering customer rmgr \"%s\" with ID %d",
> +        rmgr->rm_name, rmid);
> 
> Should it be a DEBUG message instead?  Also s/customer/custom/

It seems like a fairly important thing to have in the log. Only a
couple extensions will ever encounter this message, and only at server
start.

Typo fixed.

> +RmgrData
> +GetCustomRmgr(RmgrId rmid)
> +{
> +   if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
> +       elog(PANIC, "custom rmgr id %d out of range", rmid);
> 
> Should this be an assert?

If we make it an Assert, then it won't be caught in production builds.

> +#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
> +#define GetRmgr(rmid) ((rmid < RM_CUSTOM_MIN_ID) ? \
> +                      GetBuiltinRmgr(rmid) : GetCustomRmgr(rmid))
> 
> rmid should be protected in the macro.

Done.

> How you plan to mark it experimental?

I suppose it doesn't need to be marked explicitly -- there are other
APIs that change. For instance, the ProcessUtility_hook changed, and
that's used much more widely.

As long as we generally agree that some kind of custom rmgrs are the
way to go, if we change the API or implementation around from version
to version I can easily update my table access method.

Regards,
        Jeff Davis

From 62799e4546aa0a15b2a09f6b14900d785d64f42f Mon Sep 17 00:00:00 2001
From: Jeff Davis <j...@j-davis.com>
Date: Sat, 6 Nov 2021 13:01:38 -0700
Subject: [PATCH] Extensible rmgr.

Allow extensions to specify a new custom rmgr, which allows
specialized WAL. This is meant to be used by a custom Table Access
Method, which would not otherwise be able to offer logical
decoding/replication. It may also be used by new Index Access Methods.

Prior to this commit, only Generic WAL was available, which offers
support for recovery and physical replication but not logical
replication.

Reviewed-by: Julien Rouhaud
Discussion: https://postgr.es/m/ed1fb2e22d15d3563ae0eb610f7b61bb15999c0a.camel%40j-davis.com
---
 src/backend/access/transam/rmgr.c        | 91 ++++++++++++++++++++++++
 src/backend/access/transam/xlog.c        | 22 +++---
 src/backend/access/transam/xlogreader.c  |  2 +-
 src/backend/replication/logical/decode.c |  4 +-
 src/backend/utils/misc/guc.c             |  6 +-
 src/include/access/rmgr.h                | 14 ++++
 src/include/access/xlog_internal.h       |  7 ++
 7 files changed, 129 insertions(+), 17 deletions(-)

diff --git a/src/backend/access/transam/rmgr.c b/src/backend/access/transam/rmgr.c
index f8847d5aebf..e04492a9507 100644
--- a/src/backend/access/transam/rmgr.c
+++ b/src/backend/access/transam/rmgr.c
@@ -24,12 +24,19 @@
 #include "commands/dbcommands_xlog.h"
 #include "commands/sequence.h"
 #include "commands/tablespace.h"
+#include "miscadmin.h"
 #include "replication/decode.h"
 #include "replication/message.h"
 #include "replication/origin.h"
 #include "storage/standby.h"
+#include "utils/memutils.h"
 #include "utils/relmapper.h"
 
+typedef struct CustomRmgrEntry {
+	RmgrId rmid;
+	RmgrData *rmgr;
+} CustomRmgrEntry;
+
 /* must be kept in sync with RmgrData definition in xlog_internal.h */
 #define PG_RMGR(symname,name,redo,desc,identify,startup,cleanup,mask,decode) \
 	{ name, redo, desc, identify, startup, cleanup, mask, decode },
@@ -37,3 +44,87 @@
 const RmgrData RmgrTable[RM_MAX_ID + 1] = {
 #include "access/rmgrlist.h"
 };
+
+static CustomRmgrEntry *CustomRmgrTable = NULL;
+static int NumCustomRmgrs = 0;
+
+/*
+ * Register a new custom rmgr.
+ *
+ * Refer to https://wiki.postgresql.org/wiki/ExtensibleRmgr to reserve a
+ * unique RmgrId for your extension, to avoid conflicts. During development,
+ * use RM_EXPERIMENTAL_ID.
+ */
+void
+RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	if (!process_shared_preload_libraries_in_progress)
+		ereport(ERROR,
+				(errmsg("custom rmgr must be registered while initializing modules in shared_preload_libraries")));
+
+	ereport(LOG,
+			(errmsg("registering custom rmgr \"%s\" with ID %d",
+					rmgr->rm_name, rmid)));
+
+	if (CustomRmgrTable == NULL)
+		CustomRmgrTable = MemoryContextAllocZero(
+			TopMemoryContext, sizeof(CustomRmgrEntry));
+
+	/* check for existing builtin rmgr with the same name */
+	for (int i = 0; i <= RM_MAX_ID; i++)
+	{
+		const RmgrData *existing_rmgr = &RmgrTable[i];
+
+		if (!strcmp(existing_rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" has the same name as builtin rmgr",
+							existing_rmgr->rm_name)));
+	}
+
+	/* check for conflicting custom rmgrs already registered */
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+
+		if (entry.rmid == rmid)
+			ereport(PANIC,
+					(errmsg("custom rmgr ID %d already registered with name \"%s\"",
+							rmid, entry.rmgr->rm_name)));
+
+		if (!strcmp(entry.rmgr->rm_name, rmgr->rm_name))
+			ereport(PANIC,
+					(errmsg("custom rmgr \"%s\" already registered with ID %d",
+							rmgr->rm_name, entry.rmid)));
+	}
+
+	CustomRmgrTable = (CustomRmgrEntry *) repalloc(
+		CustomRmgrTable, sizeof(CustomRmgrEntry) * NumCustomRmgrs + 1);
+
+	CustomRmgrTable[NumCustomRmgrs].rmid = rmid;
+	CustomRmgrTable[NumCustomRmgrs].rmgr = rmgr;
+	NumCustomRmgrs++;
+}
+
+/*
+ * GetCustomRmgr
+ *
+ * This is an O(N) list traversal because the expected size is very small.
+ */
+RmgrData
+GetCustomRmgr(RmgrId rmid)
+{
+	if (rmid < RM_CUSTOM_MIN_ID || rmid > RM_CUSTOM_MAX_ID)
+		ereport(PANIC, errmsg("custom rmgr id %d is out of range", rmid));
+
+	for (int i = 0; i < NumCustomRmgrs; i++)
+	{
+		CustomRmgrEntry entry = CustomRmgrTable[i];
+		if (entry.rmid == rmid)
+			return *entry.rmgr;
+	}
+
+	ereport(PANIC, errmsg("custom rmgr with ID %d not found!", rmid));
+}
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index dfe2a0bcce9..b56c7927194 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1531,10 +1531,10 @@ checkXLogConsistency(XLogReaderState *record)
 		 * If masking function is defined, mask both the primary and replay
 		 * images
 		 */
-		if (RmgrTable[rmid].rm_mask != NULL)
+		if (GetRmgr(rmid).rm_mask != NULL)
 		{
-			RmgrTable[rmid].rm_mask(replay_image_masked, blkno);
-			RmgrTable[rmid].rm_mask(primary_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(replay_image_masked, blkno);
+			GetRmgr(rmid).rm_mask(primary_image_masked, blkno);
 		}
 
 		/* Time to compare the primary and replay images. */
@@ -7493,8 +7493,8 @@ StartupXLOG(void)
 		/* Initialize resource managers */
 		for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 		{
-			if (RmgrTable[rmid].rm_startup != NULL)
-				RmgrTable[rmid].rm_startup();
+			if (GetRmgr(rmid).rm_startup != NULL)
+				GetRmgr(rmid).rm_startup();
 		}
 
 		/*
@@ -7716,7 +7716,7 @@ StartupXLOG(void)
 					RecordKnownAssignedTransactionIds(record->xl_xid);
 
 				/* Now apply the WAL record itself */
-				RmgrTable[record->xl_rmid].rm_redo(xlogreader);
+				GetRmgr(record->xl_rmid).rm_redo(xlogreader);
 
 				/*
 				 * After redo, check whether the backup pages associated with
@@ -7826,8 +7826,8 @@ StartupXLOG(void)
 			/* Allow resource managers to do any required cleanup. */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (RmgrTable[rmid].rm_cleanup != NULL)
-					RmgrTable[rmid].rm_cleanup();
+				if (GetRmgr(rmid).rm_cleanup != NULL)
+					GetRmgr(rmid).rm_cleanup();
 			}
 
 			ereport(LOG,
@@ -10739,16 +10739,16 @@ xlog_outdesc(StringInfo buf, XLogReaderState *record)
 	uint8		info = XLogRecGetInfo(record);
 	const char *id;
 
-	appendStringInfoString(buf, RmgrTable[rmid].rm_name);
+	appendStringInfoString(buf, GetRmgr(rmid).rm_name);
 	appendStringInfoChar(buf, '/');
 
-	id = RmgrTable[rmid].rm_identify(info);
+	id = GetRmgr(rmid).rm_identify(info);
 	if (id == NULL)
 		appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK);
 	else
 		appendStringInfo(buf, "%s: ", id);
 
-	RmgrTable[rmid].rm_desc(buf, record);
+	GetRmgr(rmid).rm_desc(buf, record);
 }
 
 
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 35029cf97d6..612b1b37233 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -738,7 +738,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
 							  (uint32) SizeOfXLogRecord, record->xl_tot_len);
 		return false;
 	}
-	if (record->xl_rmid > RM_MAX_ID)
+	if (record->xl_rmid > RM_MAX_ID && record->xl_rmid < RM_CUSTOM_MIN_ID)
 	{
 		report_invalid_record(state,
 							  "invalid resource manager ID %u at %X/%X",
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 3fb5a92a1a1..52257a06882 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -115,8 +115,8 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
 
 	rmid = XLogRecGetRmid(record);
 
-	if (RmgrTable[rmid].rm_decode != NULL)
-		RmgrTable[rmid].rm_decode(ctx, &buf);
+	if (GetRmgr(rmid).rm_decode != NULL)
+		GetRmgr(rmid).rm_decode(ctx, &buf);
 	else
 	{
 		/* just deal with xid, and done */
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b3fd42e0f18..68971b0f1ea 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -11715,7 +11715,7 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 		if (pg_strcasecmp(tok, "all") == 0)
 		{
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
-				if (RmgrTable[rmid].rm_mask != NULL)
+				if (GetRmgr(rmid).rm_mask != NULL)
 					newwalconsistency[rmid] = true;
 			found = true;
 		}
@@ -11727,8 +11727,8 @@ check_wal_consistency_checking(char **newval, void **extra, GucSource source)
 			 */
 			for (rmid = 0; rmid <= RM_MAX_ID; rmid++)
 			{
-				if (pg_strcasecmp(tok, RmgrTable[rmid].rm_name) == 0 &&
-					RmgrTable[rmid].rm_mask != NULL)
+				if (pg_strcasecmp(tok, GetRmgr(rmid).rm_name) == 0 &&
+					GetRmgr(rmid).rm_mask != NULL)
 				{
 					newwalconsistency[rmid] = true;
 					found = true;
diff --git a/src/include/access/rmgr.h b/src/include/access/rmgr.h
index d9b512630ca..d387530c6f5 100644
--- a/src/include/access/rmgr.h
+++ b/src/include/access/rmgr.h
@@ -31,5 +31,19 @@ typedef enum RmgrIds
 #undef PG_RMGR
 
 #define RM_MAX_ID				(RM_NEXT_ID - 1)
+#define RM_CUSTOM_MIN_ID		128
+#define RM_CUSTOM_MAX_ID		UINT8_MAX
+
+StaticAssertDecl(RM_MAX_ID < RM_CUSTOM_MIN_ID,
+				 "RM_MAX_ID >= RM_CUSTOM_MIN_ID");
+StaticAssertDecl(RM_CUSTOM_MIN_ID < RM_CUSTOM_MAX_ID,
+				 "RM_CUSTOM_MIN_ID >= RM_CUSTOM_MAX_ID");
+
+/*
+ * RmgrId to use for extensions that require an RmgrId, but are still in
+ * development and have not reserved their own unique RmgrId yet. See:
+ * https://wiki.postgresql.org/wiki/ExtensibleRmgr
+ */
+#define RM_EXPERIMENTAL_ID		128
 
 #endif							/* RMGR_H */
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 849954a8e5a..61f421c98c2 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -321,6 +321,9 @@ typedef struct RmgrData
 
 extern const RmgrData RmgrTable[];
 
+extern RmgrData GetCustomRmgr(RmgrId rmid);
+extern void RegisterCustomRmgr(RmgrId rmid, RmgrData *rmgr);
+
 /*
  * Exported to support xlog switching from checkpointer
  */
@@ -338,4 +341,8 @@ extern bool InArchiveRecovery;
 extern bool StandbyMode;
 extern char *recoveryRestoreCommand;
 
+#define GetBuiltinRmgr(rmid) RmgrTable[(rmid)]
+#define GetRmgr(rmid) (((rmid) < RM_CUSTOM_MIN_ID) ? \
+					   GetBuiltinRmgr((rmid)) : GetCustomRmgr((rmid)))
+
 #endif							/* XLOG_INTERNAL_H */
-- 
2.17.1

Reply via email to