From adca9c34d807d37b3226ea83800f0d6187586b73 Mon Sep 17 00:00:00 2001
From: "kuroda.hayato%40jp.fujitsu.com" <kuroda.hayato@jp.fujitsu.com>
Date: Wed, 21 Sep 2022 06:02:45 +0000
Subject: [PATCH v15 1/4] Add an infrastracture for checking remote servers

This patch adds a mechanism for registering callback functions.
They should be used for checking health of remote servers.

These functions will be called when an added flag is set to true.
The flag is expected that it is set from signal handlers, which is registered by FDWs.
Inside the function a signal SIGINT should be raised and a message should be set to QueryCancelMessage
if one of remote servers is disconnected.

When a query is canceled and a string is set to QueryCancelMessage,
the server will output the given message to the log instead of the normal message.

Note that QueryCancelMessage will be never pfree()'d.
Developers must use appropriate memory context.
---
 src/backend/foreign/foreign.c    | 57 ++++++++++++++++++++++++++++++++
 src/backend/tcop/postgres.c      | 26 +++++++++++++++
 src/backend/utils/init/globals.c |  1 +
 src/include/foreign/foreign.h    | 19 +++++++++++
 src/include/miscadmin.h          |  2 ++
 src/include/tcop/tcopprot.h      |  1 +
 6 files changed, 106 insertions(+)

diff --git a/src/backend/foreign/foreign.c b/src/backend/foreign/foreign.c
index 353e20a0cf..fbc943bea9 100644
--- a/src/backend/foreign/foreign.c
+++ b/src/backend/foreign/foreign.c
@@ -28,7 +28,9 @@
 #include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/varlena.h"
+#include "utils/timeout.h"
 
+static CheckingRemoteServersCallbackItem *remote_check_callbacks = NULL;
 
 /*
  * GetForeignDataWrapper -	look up the foreign-data wrapper by OID.
@@ -810,3 +812,58 @@ GetExistingLocalJoinPath(RelOptInfo *joinrel)
 	}
 	return NULL;
 }
+
+
+/*
+ * Register callbacks for checking remote servers.
+ *
+ * This function is intended for use by FDW extensions.
+ */
+void
+RegisterCheckingRemoteServersCallback(CheckingRemoteServersCallback callback,
+									  void *arg)
+{
+	CheckingRemoteServersCallbackItem *item;
+
+	item = (CheckingRemoteServersCallbackItem *)
+			MemoryContextAlloc(TopMemoryContext,
+			sizeof(CheckingRemoteServersCallbackItem));
+	item->callback = callback;
+	item->arg = arg;
+	item->next = remote_check_callbacks;
+	remote_check_callbacks = item;
+}
+
+void
+UnRegisterCheckingRemoteServersCallback(CheckingRemoteServersCallback callback,
+										void *arg)
+{
+	CheckingRemoteServersCallbackItem *item;
+	CheckingRemoteServersCallbackItem *prev;
+
+	prev = NULL;
+	for (item = remote_check_callbacks; item; prev = item, item = item->next)
+	{
+			if (item->callback == callback && item->arg == arg)
+			{
+					if (prev)
+							prev->next = item->next;
+					else
+							remote_check_callbacks = item->next;
+					pfree(item);
+					break;
+			}
+	}
+}
+
+/*
+ * Call callbacks for checking remote servers.
+ */
+void
+CallCheckingRemoteServersCallbacks(void)
+{
+	CheckingRemoteServersCallbackItem *item;
+
+	for (item = remote_check_callbacks; item; item = item->next)
+		item->callback(item->arg);
+}
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 35eff28bd3..772a388a87 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -35,6 +35,7 @@
 #include "commands/async.h"
 #include "commands/prepare.h"
 #include "common/pg_prng.h"
+#include "foreign/foreign.h"
 #include "jit/jit.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -98,6 +99,9 @@ int			PostAuthDelay = 0;
 /* Time between checks that the client is still connected. */
 int			client_connection_check_interval = 0;
 
+/* Message string for canceling qurery caused by extensions */
+char		*QueryCancelMessage = NULL;
+
 /* ----------------
  *		private typedefs etc
  * ----------------
@@ -3224,6 +3228,13 @@ ProcessInterrupts(void)
 				 errmsg("connection to client lost")));
 	}
 
+	if (CheckingRemoteServersTimeoutPending)
+	{
+		CheckingRemoteServersTimeoutPending = false;
+
+		CallCheckingRemoteServersCallbacks();
+	}
+
 	/*
 	 * If a recovery conflict happens while we are waiting for input from the
 	 * client, the client is presumably just sitting idle in a transaction,
@@ -3328,8 +3339,20 @@ ProcessInterrupts(void)
 			LockErrorCleanup();
 			ereport(ERROR,
 					(errcode(ERRCODE_QUERY_CANCELED),
+					 QueryCancelMessage ?
+					 errmsg("%s", QueryCancelMessage) :
 					 errmsg("canceling statement due to user request")));
 		}
+		else if (QueryCancelMessage != NULL)
+		{
+			/*
+			 * If we reach here someone wanted to cancel query but it was skepped
+			 * because connection status was idle. So re-arm Pending flags
+			 * for next iteration.
+			 */
+			InterruptPending = true;
+			QueryCancelPending = true;
+		}
 	}
 
 	if (IdleInTransactionSessionTimeoutPending)
@@ -4264,6 +4287,9 @@ PostgresMain(const char *dbname, const char *username)
 		/* Report the error to the client and/or server log */
 		EmitErrorReport();
 
+		/* Make sure QueryCancelMessage is reset. */
+		QueryCancelMessage = NULL;
+
 		/*
 		 * Make sure debug_query_string gets reset before we possibly clobber
 		 * the storage it points at.
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 1a5d29ac9b..bb94adfea8 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -37,6 +37,7 @@ volatile sig_atomic_t IdleSessionTimeoutPending = false;
 volatile sig_atomic_t ProcSignalBarrierPending = false;
 volatile sig_atomic_t LogMemoryContextPending = false;
 volatile sig_atomic_t IdleStatsUpdateTimeoutPending = false;
+volatile sig_atomic_t CheckingRemoteServersTimeoutPending = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
 volatile uint32 CritSectionCount = 0;
diff --git a/src/include/foreign/foreign.h b/src/include/foreign/foreign.h
index ac82125530..9859513ac6 100644
--- a/src/include/foreign/foreign.h
+++ b/src/include/foreign/foreign.h
@@ -82,4 +82,23 @@ extern List *GetForeignColumnOptions(Oid relid, AttrNumber attnum);
 extern Oid	get_foreign_data_wrapper_oid(const char *fdwname, bool missing_ok);
 extern Oid	get_foreign_server_oid(const char *servername, bool missing_ok);
 
+
+/* Functions and variables for fdw checking */
+typedef void (*CheckingRemoteServersCallback) (void *arg);
+
+typedef struct CheckingRemoteServersCallbackItem CheckingRemoteServersCallbackItem;
+
+struct CheckingRemoteServersCallbackItem
+{
+	CheckingRemoteServersCallbackItem	*next;
+	CheckingRemoteServersCallback		callback;
+	void								*arg;
+};
+
+extern void RegisterCheckingRemoteServersCallback(CheckingRemoteServersCallback callback,
+												  void *arg);
+extern void UnRegisterCheckingRemoteServersCallback(CheckingRemoteServersCallback callback,
+													void *arg);
+extern void CallCheckingRemoteServersCallbacks(void);
+
 #endif							/* FOREIGN_H */
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index ee48e392ed..95b7d66ce3 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -99,6 +99,8 @@ extern PGDLLIMPORT volatile sig_atomic_t IdleStatsUpdateTimeoutPending;
 extern PGDLLIMPORT volatile sig_atomic_t CheckClientConnectionPending;
 extern PGDLLIMPORT volatile sig_atomic_t ClientConnectionLost;
 
+extern PGDLLIMPORT volatile sig_atomic_t CheckingRemoteServersTimeoutPending;
+
 /* these are marked volatile because they are examined by signal handlers: */
 extern PGDLLIMPORT volatile uint32 InterruptHoldoffCount;
 extern PGDLLIMPORT volatile uint32 QueryCancelHoldoffCount;
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 5d34978f32..3a8e8c5a19 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -30,6 +30,7 @@ extern PGDLLIMPORT const char *debug_query_string;
 extern PGDLLIMPORT int max_stack_depth;
 extern PGDLLIMPORT int PostAuthDelay;
 extern PGDLLIMPORT int client_connection_check_interval;
+extern PGDLLIMPORT char* QueryCancelMessage;
 
 /* GUC-configurable parameters */
 
-- 
2.27.0

