From 119d7ed341477b1c5f644ae289bf8cb01ad5bc5a Mon Sep 17 00:00:00 2001
From: Daniel Gustafsson <daniel@yesql.se>
Date: Wed, 31 Oct 2018 00:12:39 +0100
Subject: [PATCH 1/2] Add infrastructure for signalling backends with payload

This adds an API to include a payload consisting of a message, an
elevel and a custom sqlerrcode, when signalling a backend. The main
usecase is to be able to include context to the user as to why the
connection is terminated or query cancelled.
---
 src/backend/storage/ipc/Makefile         |   2 +-
 src/backend/storage/ipc/ipci.c           |   3 +
 src/backend/storage/ipc/signal_message.c | 286 +++++++++++++++++++++++++++++++
 src/backend/utils/init/postinit.c        |   2 +
 src/include/storage/signal_message.h     |  29 ++++
 5 files changed, 321 insertions(+), 1 deletion(-)
 create mode 100644 src/backend/storage/ipc/signal_message.c
 create mode 100644 src/include/storage/signal_message.h

diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile
index 49e7c9f15e..fd628af59f 100644
--- a/src/backend/storage/ipc/Makefile
+++ b/src/backend/storage/ipc/Makefile
@@ -10,6 +10,6 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = barrier.o dsm_impl.o dsm.o ipc.o ipci.o latch.o pmsignal.o procarray.o \
 	procsignal.o shmem.o shmqueue.o shm_mq.o shm_toc.o signalfuncs.o \
-	sinval.o sinvaladt.o standby.o
+	signal_message.o sinval.o sinvaladt.o standby.o
 
 include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 0c86a581c0..fb7c1a0b49 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -33,6 +33,7 @@
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
 #include "replication/origin.h"
+#include "storage/signal_message.h"
 #include "storage/bufmgr.h"
 #include "storage/dsm.h"
 #include "storage/ipc.h"
@@ -150,6 +151,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
 		size = add_size(size, BackendRandomShmemSize());
+		size = add_size(size, BackendSignalFeedbackShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -270,6 +272,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	SyncScanShmemInit();
 	AsyncShmemInit();
 	BackendRandomShmemInit();
+	BackendSignalFeedbackShmemInit();
 
 #ifdef EXEC_BACKEND
 
diff --git a/src/backend/storage/ipc/signal_message.c b/src/backend/storage/ipc/signal_message.c
new file mode 100644
index 0000000000..596f9584cc
--- /dev/null
+++ b/src/backend/storage/ipc/signal_message.c
@@ -0,0 +1,286 @@
+/*-------------------------------------------------------------------------
+ *
+ * signal_message.c
+ *	  Functions for sending a message to a signalled backend
+ *
+ * This file contains routines to handle registering an optional message when
+ * cancelling, or terminating, a backend as well changing the sqlerrcode used.
+ * The combined payload of message/errcode is referred to as feedback.  The
+ * message will be stored in shared memory and is limited to MAX_CANCEL_MSG
+ * characters including the NULL terminator.
+ *
+ * Access to the feedback slots is protected by spinlocks.
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ *	  src/backend/storage/ipc/signal_message.c
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "mb/pg_wchar.h"
+#include "miscadmin.h"
+#include "storage/ipc.h"
+#include "storage/shmem.h"
+#include "storage/signal_message.h"
+#include "storage/spin.h"
+
+
+/*
+ * Structure for registering a feedback payload to be sent to a cancelled, or
+ * terminated backend. Each backend is registered per pid in the array which is
+ * indexed by Backend ID. Reading and writing the message is protected by a
+ * per-slot spinlock.
+ */
+typedef struct
+{
+	pid_t	dest_pid;		/* The pid of the process being signalled */
+	pid_t	src_pid;		/* The pid of the processing signalling */
+	slock_t	mutex;			/* Per-slot protection */
+	char	message[MAX_CANCEL_MSG]; /* Message to send to signalled backend */
+	int		orig_length;	/* Length of the message as passed by the user,
+							 * if this length exceeds MAX_CANCEL_MSG it will
+							 * be truncated but we store the original length
+							 * in order to be able to convey truncation */
+	int		sqlerrcode;		/* errcode to use when signalling backend */
+	int		elevel;			/* elevel to use when signalling backend */
+} BackendSignalFeedbackShmemStruct;
+
+static BackendSignalFeedbackShmemStruct	*BackendSignalFeedbackSlots = NULL;
+static volatile BackendSignalFeedbackShmemStruct *MyCancelSlot = NULL;
+static void CleanupBackendSignalFeedback(int status, Datum argument);
+static int backend_feedback(pid_t backend_pid, char *message, int sqlerrcode,
+							int elevel);
+
+/*
+ * Return the required size for the cancelation feedback Shmem area.
+ */
+Size
+BackendSignalFeedbackShmemSize(void)
+{
+	return MaxBackends * sizeof(BackendSignalFeedbackShmemStruct);
+}
+
+/*
+ * Create and initialize the Shmem structure for holding the feedback, the
+ * bookkeeping for them and the spinlocks associated.
+ */
+void
+BackendSignalFeedbackShmemInit(void)
+{
+	Size	size = BackendSignalFeedbackShmemSize();
+	bool	found;
+	int		i;
+
+	BackendSignalFeedbackSlots = (BackendSignalFeedbackShmemStruct *)
+		ShmemInitStruct("BackendSignalFeedbackSlots", size, &found);
+
+	if (!found)
+	{
+		MemSet(BackendSignalFeedbackSlots, 0, size);
+
+		for (i = 0; i < MaxBackends; i++)
+			SpinLockInit(&(BackendSignalFeedbackSlots[i].mutex));
+	}
+}
+
+/*
+ * Set up the slot for the current backend_id
+ */
+void
+BackendSignalFeedbackInit(int backend_id)
+{
+	volatile BackendSignalFeedbackShmemStruct *slot;
+
+	slot = &BackendSignalFeedbackSlots[backend_id - 1];
+
+	slot->message[0] = '\0';
+	slot->orig_length = 0;
+	slot->sqlerrcode = 0;
+	slot->elevel = 0;
+	slot->dest_pid = MyProcPid;
+
+	MyCancelSlot = slot;
+
+	on_shmem_exit(CleanupBackendSignalFeedback, Int32GetDatum(backend_id));
+}
+
+/*
+ * Ensure that the slot is purged and emptied at exit. Any message gets
+ * overwritten with null chars to avoid risking exposing a message intended for
+ * another backend to a new backend.
+ */
+static void
+CleanupBackendSignalFeedback(int status, Datum argument)
+{
+	int backend_id = DatumGetInt32(argument);
+	volatile BackendSignalFeedbackShmemStruct *slot;
+
+	slot = &BackendSignalFeedbackSlots[backend_id - 1];
+
+	Assert(slot == MyCancelSlot);
+
+	MyCancelSlot = NULL;
+
+	if (slot->orig_length > 0)
+		MemSet(slot->message, '\0', sizeof(slot->message));
+
+	slot->orig_length = 0;
+	slot->sqlerrcode = 0;
+	slot->elevel = 0;
+	slot->dest_pid = 0;
+	slot->src_pid = 0;
+}
+
+/*
+ * Set a message for the cancellation of the backend with the specified pid,
+ * using the default sqlerrcode.
+ */
+int
+SetBackendCancelMessage(pid_t backend_pid, char *message)
+{
+	return backend_feedback(backend_pid, message, ERRCODE_QUERY_CANCELED,
+							ERROR);
+}
+
+/*
+ * Set a message for the termination of the backend with the specified pid,
+ * using the default sqlerrcode.
+ */
+int
+SetBackendTerminationMessage(pid_t backend_pid, char *message)
+{
+	return backend_feedback(backend_pid, message, ERRCODE_ADMIN_SHUTDOWN,
+							FATAL);
+}
+
+/*
+ * Set both a message, elevel and a sqlerrcode for use when signalling the
+ * backend with the specified pid.
+ */
+int
+SetBackendSignalFeedback(pid_t backend_pid, char *message, int sqlerrcode,
+						 int elevel)
+{
+	return backend_feedback(backend_pid, message, sqlerrcode, elevel);
+}
+
+/*
+ * Sets a cancellation message for the backend with the specified pid, and
+ * returns zero on success. If the backend isn't found, or no message is
+ * passed, 1 is returned.  If two backends collide in setting a message, the
+ * existing message will be overwritten by the last one in. The message will
+ * be truncated to fit within MAX_CANCEL_MSG bytes.
+ */
+static int
+backend_feedback(pid_t backend_pid, char *message, int sqlerrcode, int elevel)
+{
+	int		i;
+	int		len;
+
+	if (!message)
+		return 1;
+
+	len = pg_mbcliplen(message, strlen(message), MAX_CANCEL_MSG - 1);
+
+	for (i = 0; i < MaxBackends; i++)
+	{
+		BackendSignalFeedbackShmemStruct *slot = &BackendSignalFeedbackSlots[i];
+
+		if (slot->dest_pid != 0 && slot->dest_pid == backend_pid)
+		{
+			SpinLockAcquire(&slot->mutex);
+			if (slot->dest_pid != backend_pid)
+			{
+				SpinLockRelease(&slot->mutex);
+				return 1;
+			}
+
+			memcpy(slot->message, message, len);
+			slot->orig_length = pg_mbstrlen(message);
+			slot->sqlerrcode = sqlerrcode;
+			slot->elevel = elevel;
+			slot->src_pid = MyProcPid;
+			SpinLockRelease(&slot->mutex);
+
+			if (len != strlen(message))
+				ereport(NOTICE,
+						(errmsg("message is too long and has been truncated")));
+			return 0;
+		}
+	}
+
+	return 1;
+}
+
+/*
+ * HasBackendSignalFeedback
+ *		Test if there is a backend signalling feedback to consume
+ *
+ * Test whether there is feedback registered for the current backend that can
+ * be consumed and presented to the user. It isn't strictly required to call
+ * this function prior to consuming a potential message, but since consuming it
+ * will clear it there can be cases where one would like to peek first.
+ */
+bool
+HasBackendSignalFeedback(void)
+{
+	volatile BackendSignalFeedbackShmemStruct *slot = MyCancelSlot;
+	bool 	has_message = false;
+
+	if (slot != NULL)
+	{
+		SpinLockAcquire(&slot->mutex);
+		has_message = ((slot->orig_length > 0) && (slot->sqlerrcode != 0));
+		SpinLockRelease(&slot->mutex);
+	}
+
+	return has_message;
+}
+
+/*
+ * ConsumeBackendSignalFeedback
+ *		Read and clear backend signalling feedback
+ *
+ * Return the configured signal feedback in buffer, which is buf_len bytes in
+ * size.  The original length of the message is returned, or zero in case no
+ * message was found. If the returned length exceeds that of Min(buf_len,
+ * MAX_CANCEL_MSG), then truncation has been performed. The feedback (message
+ * and errcode) is cleared on consumption. There is no point in passing a
+ * buffer larger than MAX_CANCEL_MSG as that is the upper bound on what will be
+ * stored in the slot.
+ */
+int
+ConsumeBackendSignalFeedback(char *buffer, size_t buf_len, int *sqlerrcode,
+							 pid_t *pid, int *elevel)
+{
+	volatile BackendSignalFeedbackShmemStruct *slot = MyCancelSlot;
+	int		msg_length = 0;
+
+	if (slot != NULL && slot->orig_length > 0)
+	{
+		SpinLockAcquire(&slot->mutex);
+		if (buffer && buf_len)
+			strlcpy(buffer, (const char *) slot->message, buf_len);
+		msg_length = slot->orig_length;
+		if (sqlerrcode)
+			*sqlerrcode = slot->sqlerrcode;
+		if (pid)
+			*pid = slot->src_pid;
+		if (elevel)
+			*elevel = slot->elevel;
+
+		slot->orig_length = 0;
+		/* Avoid risking to leak any part of a previously set message */
+		MemSet(slot->message, '\0', sizeof(slot->message));
+		slot->sqlerrcode = 0;
+		slot->elevel = 0;
+		SpinLockRelease(&slot->mutex);
+	}
+
+	return msg_length;
+}
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 4f1d2a0d28..4382691420 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -40,6 +40,7 @@
 #include "postmaster/autovacuum.h"
 #include "postmaster/postmaster.h"
 #include "replication/walsender.h"
+#include "storage/signal_message.h"
 #include "storage/bufmgr.h"
 #include "storage/fd.h"
 #include "storage/ipc.h"
@@ -780,6 +781,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 		PerformAuthentication(MyProcPort);
 		InitializeSessionUserId(username, useroid);
 		am_superuser = superuser();
+		BackendSignalFeedbackInit(MyBackendId);
 	}
 
 	/*
diff --git a/src/include/storage/signal_message.h b/src/include/storage/signal_message.h
new file mode 100644
index 0000000000..e0f834f2f5
--- /dev/null
+++ b/src/include/storage/signal_message.h
@@ -0,0 +1,29 @@
+/*-------------------------------------------------------------------------
+ *
+ * signal_message.h
+ *		Declarations for sending a message to a signalled backend
+ *
+ * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
+ *
+ *	  src/include/storage/signal_message.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef SIGNAL_MESSAGE_H
+#define SIGNAL_MESSAGE_H
+
+#define MAX_CANCEL_MSG 128
+
+extern Size BackendSignalFeedbackShmemSize(void);
+extern void BackendSignalFeedbackShmemInit(void);
+extern void BackendSignalFeedbackInit(int backend_id);
+
+extern int SetBackendCancelMessage(pid_t backend, char *message);
+extern int SetBackendTerminationMessage(pid_t backend, char *message);
+extern int SetBackendSignalFeedback(pid_t backend, char *message,
+									int sqlerrcode, int elevel);
+extern bool HasBackendSignalFeedback(void);
+extern int ConsumeBackendSignalFeedback(char *msg, size_t len, int *sqlerrcode,
+										pid_t *pid, int *elevel);
+
+#endif							/* SIGNAL_MESSAGE_H */
-- 
2.14.1.145.gb3622a4ee

