On Tue, Sep 8, 2015 at 11:49 AM, Shulgin, Oleksandr <
oleksandr.shul...@zalando.de> wrote:

>
> >> The real problem could be if the process that was signaled to connect
> to the message queue never handles the interrupt, and we keep waiting
> forever in shm_mq_receive().  We could add a timeout parameter or just let
> the user cancel the call: send a cancellation request, use
> pg_cancel_backend() or set statement_timeout before running this.
> >
> > This is valid question - for begin we can use a statement_timeout and we
> don't need to design some special (if you don't hold some important lock).
> > My example (the code has prototype quality) is little bit longer, but it
> work without global lock - the requester doesn't block any other
>
> I'll update the commitfest patch to use this technique.
>

Please find attached v4.

--
Alex
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 32ac58f..a2b757c 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -139,6 +139,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, CmdStatusShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -243,6 +244,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	ReplicationOriginShmemInit();
 	WalSndShmemInit();
 	WalRcvShmemInit();
+	CmdStatusShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 0abde43..e637be1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -26,6 +26,7 @@
 #include "storage/shmem.h"
 #include "storage/sinval.h"
 #include "tcop/tcopprot.h"
+#include "utils/cmdstatus.h"
 
 
 /*
@@ -296,6 +297,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
 
+	if (CheckProcSignal(PROCSIG_CMD_STATUS_INFO))
+		HandleCmdStatusInfoInterrupt();
+
 	if (set_latch_on_sigusr1)
 		SetLatch(MyLatch);
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index d917af3..1a5b03c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -67,6 +67,7 @@
 #include "tcop/pquery.h"
 #include "tcop/tcopprot.h"
 #include "tcop/utility.h"
+#include "utils/cmdstatus.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -2991,6 +2992,9 @@ ProcessInterrupts(void)
 
 	if (ParallelMessagePending)
 		HandleParallelMessages();
+
+	if (CmdStatusInfoPending)
+		ProcessCmdStatusInfoRequest();
 }
 
 
diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile
index 3ed0b44..2c8687c 100644
--- a/src/backend/utils/adt/Makefile
+++ b/src/backend/utils/adt/Makefile
@@ -18,7 +18,7 @@ endif
 # keep this list arranged alphabetically or it gets to be a mess
 OBJS = acl.o arrayfuncs.o array_expanded.o array_selfuncs.o \
 	array_typanalyze.o array_userfuncs.o arrayutils.o ascii.o \
-	bool.o cash.o char.o date.o datetime.o datum.o dbsize.o domains.o \
+	bool.o cash.o char.o cmdstatus.o date.o datetime.o datum.o dbsize.o domains.o \
 	encode.o enum.o expandeddatum.o \
 	float.o format_type.o formatting.o genfile.o \
 	geo_ops.o geo_selfuncs.o inet_cidr_ntop.o inet_net_pton.o int.o \
diff --git a/src/backend/utils/adt/cmdstatus.c b/src/backend/utils/adt/cmdstatus.c
new file mode 100644
index 0000000..c17de9d
--- /dev/null
+++ b/src/backend/utils/adt/cmdstatus.c
@@ -0,0 +1,647 @@
+/*-------------------------------------------------------------------------
+ *
+ * cmdstatus.c
+ *	  Definitions for pg_cmdstatus function.
+ *
+ * Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/utils/adt/cmdstatus.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/htup_details.h"
+#include "commands/explain.h"
+#include "lib/stringinfo.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/dest.h"
+#include "tcop/pquery.h"
+#include "utils/builtins.h"
+#include "utils/cmdstatus.h"
+
+
+typedef enum {
+	CMD_STATUS_REQUEST_EXPLAIN = 1,
+	CMD_STATUS_REQUEST_QUERY_TEXT = 2,
+	CMD_STATUS_REQUEST_PROGRESS_TAG = 3,
+	CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE = 4
+} CmdStatusInfoRequestType;
+
+#define CMD_STATUS_MAX_REQUEST CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE
+
+typedef enum {
+	CMD_STATUS_RESULT_FAILURE = -1,
+	CMD_STATUS_RESULT_SUCCESS = 0,
+	CMD_STATUS_RESULT_BACKEND_IDLE,
+	CMD_STATUS_RESULT_NO_COMMAND_TAG
+} CmdStatusInfoResultCode;
+
+typedef struct
+{
+	pid_t			css_pid;
+	pid_t			sender_css_pid;
+	/*BackendId		sender_backend_id;*/
+	dsm_handle		dsm_seg_handle;
+	CmdStatusInfoRequestType	request_type;
+	CmdStatusInfoResultCode		result_code;
+	bool			is_valid;
+	bool			is_processed;
+} CmdStatusSlot;
+
+#define NumCmdStatusSlots	(MaxBackends + NUM_AUXPROCTYPES)
+
+#define PG_CMD_STATUS_INFO_MAGIC	0x79fb2449
+#define CMD_STATUS_BUFFER_SIZE		1024
+
+/*
+ * These structs are allocated on the program stack as local variables in the
+ * ExecutorRun hook.  The top of stack is current_query_stack, see below.
+ */
+typedef struct CmdInfoStack {
+	QueryDesc			   *query_desc;
+	struct CmdInfoStack	   *parent;
+} CmdInfoStack;
+
+
+static CmdStatusSlot *CmdStatusSlots = NULL;
+static volatile CmdStatusSlot *MyCmdStatusSlot = NULL;
+
+static CmdInfoStack	   *current_query_stack = NULL;
+static int				query_stack_size = 0;
+
+static ExecutorRun_hook_type	prev_ExecutorRun = NULL;
+
+static void cmdstatus_ExecutorRun(QueryDesc *queryDesc,
+								  ScanDirection direction, long count);
+
+
+Size
+CmdStatusShmemSize(void)
+{
+	return NumCmdStatusSlots * sizeof(CmdStatusSlot);
+}
+
+void
+CmdStatusShmemInit(void)
+{
+	Size		size = CmdStatusShmemSize();
+	bool		found;
+
+	CmdStatusSlots = (CmdStatusSlot *)
+		ShmemInitStruct("CmdStatusSlots", size, &found);
+
+	if (!found)
+		MemSet(CmdStatusSlots, 0, size);
+}
+
+static void
+CleanupCmdStatusSlot(int status, Datum arg)
+{
+	int		css_idx = DatumGetInt32(arg);
+	volatile CmdStatusSlot *slot;
+
+	slot = &CmdStatusSlots[css_idx - 1];
+	Assert(slot == MyCmdStatusSlot);
+
+	MyCmdStatusSlot = NULL;
+
+	if (slot->css_pid != MyProcPid)
+	{
+		elog(LOG, "process %d releasing ProcSignal slot %d, but it contains %d",
+			 MyProcPid, css_idx, (int) slot->css_pid);
+		return;					/* XXX better to zero the slot anyway? */
+	}
+
+	slot->css_pid = 0;
+	slot->is_valid = false;
+}
+
+void
+CmdStatusInit(int css_idx)
+{
+	volatile CmdStatusSlot *slot;
+
+	slot = &CmdStatusSlots[css_idx - 1];
+
+	if (slot->css_pid != 0)
+		elog(LOG, "process %d taking over CmdStatus slot %d, but it's not empty",
+			 MyProcPid, css_idx);
+
+	slot->css_pid = MyProcPid;
+	slot->is_valid = false;
+
+	MyCmdStatusSlot = slot;
+
+	on_shmem_exit(CleanupCmdStatusSlot, Int32GetDatum(css_idx));
+
+	/* also install executor hooks */
+	prev_ExecutorRun = ExecutorRun_hook;
+	ExecutorRun_hook = cmdstatus_ExecutorRun;
+}
+
+static int
+SendCmdStatusInfoSignal(pid_t target_pid, BackendId target_backend_id,
+						dsm_handle dsm_seg_handle,
+						CmdStatusInfoRequestType request_type)
+{
+	MyCmdStatusSlot->sender_css_pid = target_pid;
+	/*MyCmdStatusSlot->sender_backend_id = target_backend_id;*/
+	MyCmdStatusSlot->dsm_seg_handle = dsm_seg_handle;
+	MyCmdStatusSlot->request_type = request_type;
+	MyCmdStatusSlot->is_processed = false;
+	MyCmdStatusSlot->is_valid = true;		/* this should be set the latest */
+
+	return SendProcSignal(target_pid, PROCSIG_CMD_STATUS_INFO, target_backend_id);
+}
+
+static void
+InvalidateCmdStatusSlot(void)
+{
+	MyCmdStatusSlot->is_valid = false;
+	MyCmdStatusSlot->is_processed = false;
+	MyCmdStatusSlot->sender_css_pid = 0;
+	/*MyCmdStatusSlot->sender_backend_id = 0; */
+	MyCmdStatusSlot->dsm_seg_handle = NULL;
+	MyCmdStatusSlot->request_type = 0;
+}
+
+
+
+static void
+cmdstatus_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+{
+	CmdInfoStack	current;
+
+	current.query_desc = queryDesc;
+	current.parent = current_query_stack;
+
+	current_query_stack = &current;
+	query_stack_size++;
+
+	PG_TRY();
+	{
+		if (prev_ExecutorRun)
+			prev_ExecutorRun(queryDesc, direction, count);
+		else
+			standard_ExecutorRun(queryDesc, direction, count);
+
+		Assert(current_query_stack == &current);
+		Assert(query_stack_size > 0);
+
+		query_stack_size--;
+		current_query_stack = current.parent;
+	}
+	PG_CATCH();
+	{
+		Assert(current_query_stack == &current);
+		Assert(query_stack_size > 0);
+
+		query_stack_size--;
+		current_query_stack = current.parent;
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+}
+
+
+static StringInfo
+explain_query(QueryDesc *queryDesc)
+{
+	StringInfo		str;
+	ExplainState   *es;
+
+	es = NewExplainState();
+	es->analyze = false;
+	es->verbose = false;
+	es->buffers = false;
+	es->format = EXPLAIN_FORMAT_TEXT;
+
+	ExplainBeginOutput(es);
+	/* XXX: appendStringInfo(es->str, "#%d ", depth); ? */
+	ExplainQueryText(es, queryDesc);
+	ExplainPrintPlan(es, queryDesc);
+	ExplainEndOutput(es);
+
+	str = es->str;
+
+	pfree(es);
+	return str;
+}
+
+
+/* signal handler for PROCSIG_CMD_STATUS_INFO */
+void
+HandleCmdStatusInfoInterrupt(void)
+{
+	CmdStatusInfoPending = true;
+	InterruptPending = true;
+
+	SetLatch(MyLatch);
+}
+
+
+static StringInfo
+ProcessCmdStatusSlot(volatile CmdStatusSlot *slot)
+{
+	StringInfo		result = NULL;
+
+	/* Show some optimism. */
+	slot->result_code = CMD_STATUS_RESULT_SUCCESS;
+
+	if (ActivePortal)
+	{
+		switch (slot->request_type)
+		{
+			case CMD_STATUS_REQUEST_EXPLAIN:
+				result = explain_query(ActivePortal->queryDesc);
+				break;
+
+			case CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE:
+				{
+					StringInfo		str;
+					CmdInfoStack   *query;
+
+					result = makeStringInfo();
+
+					if (current_query_stack != NULL)
+					{
+						for (query = current_query_stack;
+							 query != NULL;
+							 query = query->parent)
+						{
+							str = explain_query(query->query_desc);
+							appendBinaryStringInfo(result, str->data, str->len);
+
+							pfree(str->data);
+							pfree(str);
+						}
+					}
+					else
+					{
+						/* XXX */
+						slot->result_code = CMD_STATUS_RESULT_BACKEND_IDLE;
+					}
+					break;
+				}
+
+			case CMD_STATUS_REQUEST_QUERY_TEXT:
+				{
+					result = makeStringInfo();
+					appendStringInfoString(result, ActivePortal->sourceText);
+				}
+				break;
+
+			case CMD_STATUS_REQUEST_PROGRESS_TAG:
+				if (ActivePortal->commandTag != NULL)
+				{
+					result = makeStringInfo();
+
+					if (ActivePortal->queryDesc != NULL &&
+						ActivePortal->queryDesc->estate != NULL)
+					{
+						appendStringInfo(result, "%s %u",
+										 ActivePortal->commandTag,
+										 ActivePortal->queryDesc->estate->es_processed);
+					}
+					else
+					{
+						/* no progress available, at least show the command tag */
+						appendStringInfoString(result, ActivePortal->commandTag);
+					}
+				}
+				else
+				{
+					slot->result_code = CMD_STATUS_RESULT_NO_COMMAND_TAG;
+				}
+				break;
+		}
+	}
+	else
+	{
+		slot->result_code = CMD_STATUS_RESULT_BACKEND_IDLE;
+	}
+
+	return result;
+}
+
+static void
+RestoreResourceOwner(ResourceOwner prevResourceOwner)
+{
+	if (prevResourceOwner != CurrentResourceOwner)
+	{
+		ResourceOwner	res_owner = CurrentResourceOwner;
+
+		CurrentResourceOwner = prevResourceOwner;
+		ResourceOwnerDelete(res_owner);
+	}
+}
+
+void
+ProcessCmdStatusInfoRequest(void)
+{
+	elog(LOG, "+ProcessCmdStatusInfoRequest: %d", CmdStatusInfoPending);
+
+	while (CmdStatusInfoPending)
+	{
+		int		i;
+
+		CmdStatusInfoPending = false;
+
+		for (i = 0; i < NumCmdStatusSlots; i++)
+		{
+			volatile CmdStatusSlot *slot = &CmdStatusSlots[i];
+
+			if (slot->sender_css_pid == MyProcPid)
+			{
+				elog(LOG, "found receiver slot: PID %d, valid: %d, processed: %d",
+					 slot->css_pid, slot->is_valid, slot->is_processed);
+
+			if (slot->is_valid && !slot->is_processed)
+			{
+				ResourceOwner	prevResourceOwner = CurrentResourceOwner;
+				dsm_segment	   *seg = NULL;
+				shm_toc		   *toc = NULL;
+				shm_mq_handle  *output;
+				shm_mq		   *mq;
+
+				/* Ensure valid resource owner for access to dsm. */
+				if (CurrentResourceOwner == NULL)
+					CurrentResourceOwner = ResourceOwnerCreate(NULL, "ProcessCmdStatusInfoRequest");
+
+				seg = dsm_attach(slot->dsm_seg_handle);
+				if (seg == NULL)
+				{
+					elog(LOG, "unable to map dynamic memory segment for command status");
+
+					slot->is_processed = true;
+					slot->is_valid = false;
+
+					RestoreResourceOwner(prevResourceOwner);
+					continue;
+				}
+
+				toc = shm_toc_attach(PG_CMD_STATUS_INFO_MAGIC, dsm_segment_address(seg));
+				if (toc == NULL)
+				{
+					elog(LOG, "bad magic in dynamic memory segment for command status");
+
+					slot->is_processed = true;
+					slot->is_valid = false;
+
+					dsm_detach(seg);
+
+					RestoreResourceOwner(prevResourceOwner);
+					continue;
+				}
+
+				mq = shm_toc_lookup(toc, 0);
+
+				elog(LOG, "sender: attaching mq");
+				output = shm_mq_attach(mq, seg, NULL);
+
+				PG_TRY();
+				{
+					StringInfo		payload;
+					int				res;
+
+					payload = ProcessCmdStatusSlot(slot);
+					if (payload != NULL)
+					{
+						elog(LOG, "sender: sending payload");
+
+						slot->is_processed = true;
+
+						res = shm_mq_send(output, payload->len, payload->data, false);
+						if (res != SHM_MQ_SUCCESS)
+						{
+							elog(LOG, "cannot send command status to backend with PID %d",
+								 slot->css_pid);
+							slot->is_valid = false;
+						}
+						pfree(payload->data);
+						pfree(payload);
+					}
+					else
+					{
+						slot->is_valid = false;
+					}
+
+					elog(LOG, "sender: detaching mq");
+					shm_mq_detach(mq);
+					dsm_detach(seg);
+
+					RestoreResourceOwner(prevResourceOwner);
+				}
+				PG_CATCH();
+				{
+					slot->is_processed = true;
+					slot->is_valid = false;
+
+					elog(LOG, "sender: detaching mq (ON ERROR)");
+					shm_mq_detach(mq);
+					dsm_detach(seg);
+
+					RestoreResourceOwner(prevResourceOwner);
+
+					PG_RE_THROW();
+				}
+				PG_END_TRY();
+			}
+			}
+		}
+	}
+
+	elog(LOG, "-ProcessCmdStatusInfoRequest");
+}
+
+static void
+report_result_code(CmdStatusInfoResultCode result, pid_t target_pid)
+{
+	switch (result)
+	{
+		case CMD_STATUS_RESULT_BACKEND_IDLE:
+			elog(INFO, "no command is currently running in backend with PID %d",
+				 target_pid);
+			break;
+
+		case CMD_STATUS_RESULT_NO_COMMAND_TAG:
+			elog(WARNING, "no command tag found for the query in backend with PID %d",
+				 target_pid);
+			break;
+
+		default:
+			elog(ERROR, "general command status request failure");
+			break;
+	}
+}
+
+/*
+ * try to get status of command in another process
+ *
+ * FUNCTION get_cmdstatus(pid, request_type)
+ * RETURNS SETOF text
+ */
+Datum
+pg_cmdstatus(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo	   *rsinfo	= (ReturnSetInfo *) fcinfo->resultinfo;
+	Tuplestorestate	   *tupstore;
+	TupleDesc			tupdesc;
+	MemoryContext		per_query_ctx;
+	MemoryContext		oldcontext;
+	pid_t				target_pid = (pid_t) PG_GETARG_INT32(0);
+	int					request_type = PG_GETARG_INT32(1);
+	PGPROC			   *proc;
+	shm_toc_estimator	estimator;
+	Size				segsize;
+	dsm_segment		   *seg = NULL;
+	shm_toc			   *toc = NULL;
+	shm_mq			   *mq = NULL;
+	shm_mq_handle	   *input = NULL;
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+
+	if (request_type < 1 || request_type > CMD_STATUS_MAX_REQUEST)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("unknown command status request")));
+
+	/* need to build tuplestore in query context */
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+	tupstore = tuplestore_begin_heap(false, false, work_mem);
+	MemoryContextSwitchTo(oldcontext);
+
+	if (target_pid == MyProcPid)
+		ereport(ERROR,
+				(errmsg("backend cannot query command status of itself")));
+
+	/* verify access to target_pid */
+	proc = BackendPidGetProc(target_pid);
+
+	if (proc == NULL)
+		ereport(ERROR,
+				(errmsg("PID %d is not a PostgreSQL server process", target_pid)));
+
+	if (!(superuser() || proc->roleId == GetUserId()))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 (errmsg("must be superuser or have the same role to cancel queries running in other server processes"))));
+
+	/* prepare shared dsm segment */
+	shm_toc_initialize_estimator(&estimator);
+	shm_toc_estimate_keys(&estimator, 1);
+	shm_toc_estimate_chunk(&estimator, CMD_STATUS_BUFFER_SIZE);
+	segsize = shm_toc_estimate(&estimator);
+
+	seg = dsm_create(segsize, 0);
+
+	toc = shm_toc_create(PG_CMD_STATUS_INFO_MAGIC, dsm_segment_address(seg),
+						 segsize);
+
+	/* prepare basic structures */
+	mq = shm_mq_create(shm_toc_allocate(toc, CMD_STATUS_BUFFER_SIZE),
+					   CMD_STATUS_BUFFER_SIZE);
+
+	shm_mq_set_receiver(mq, MyProc);
+	shm_mq_set_sender(mq, proc);
+	shm_toc_insert(toc, 0, mq);
+
+	PG_TRY();
+	{
+		shm_mq_result	res;
+		char		   *data = NULL;
+		Size			length;
+
+		elog(LOG, "receiver: attaching mq");
+		input = shm_mq_attach(mq, seg, NULL);
+
+		elog(LOG, "receiver: signalling PID %d", target_pid);
+		if (SendCmdStatusInfoSignal(target_pid, proc->backendId,
+									dsm_segment_handle(seg), request_type))
+		{
+			elog(ERROR, "could not signal backend with PID %d", target_pid);
+		}
+
+		/* try to read from queue */
+		res = shm_mq_receive(input, &length, (void **) &data, false);
+		if (res == SHM_MQ_SUCCESS)
+		{
+			const char	   *p = data;
+
+			/* break into tuples on newline */
+			while (*p)
+			{
+				const char *q = strchr(p, '\n');
+				Size		len = q ? q - p : strlen(p);
+				Datum		value;
+				HeapTuple	tuple;
+				bool		isnull = false;
+
+				value = PointerGetDatum(cstring_to_text_with_len(p, len));
+
+				tuple = heap_form_tuple(tupdesc, &value, &isnull);
+				tuplestore_puttuple(tupstore, tuple);
+
+				if (!q)
+					break;
+
+				p += len + 1;
+			}
+		}
+		else
+		{
+			report_result_code(MyCmdStatusSlot->result_code, target_pid);
+		}
+
+		/* clean up and return the tuplestore */
+		tuplestore_donestoring(tupstore);
+
+		InvalidateCmdStatusSlot();
+
+		elog(LOG, "receiver: detaching mq");
+		shm_mq_detach(mq);
+		dsm_detach(seg);
+	}
+	PG_CATCH();
+	{
+		InvalidateCmdStatusSlot();
+
+		elog(LOG, "receiver: detaching mq (ON ERROR)");
+		shm_mq_detach(mq);
+		dsm_detach(seg);
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	return (Datum) 0;
+}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 23e594e..2269853 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -29,6 +29,7 @@ ProtocolVersion FrontendProtocol;
 volatile bool InterruptPending = false;
 volatile bool QueryCancelPending = false;
 volatile bool ProcDiePending = false;
+volatile bool CmdStatusInfoPending = false;
 volatile bool ClientConnectionLost = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 7b19714..7e7ba77 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -50,6 +50,7 @@
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
+#include "utils/cmdstatus.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -588,6 +589,9 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 	/* Now that we have a BackendId, we can participate in ProcSignal */
 	ProcSignalInit(MyBackendId);
 
+	/* Init CmdStatus slot */
+	CmdStatusInit(MyBackendId);
+
 	/*
 	 * Also set up timeout handlers needed for backend operation.  We need
 	 * these in every case except bootstrap.
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index ddf7c67..d083aaf 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3130,6 +3130,8 @@ DESCR("get OID of current session's temp schema, if any");
 DATA(insert OID = 2855 (  pg_is_other_temp_schema	PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ pg_is_other_temp_schema _null_ _null_ _null_ ));
 DESCR("is schema another session's temp schema?");
 
+DATA(insert OID = 4099 ( pg_cmdstatus		PGNSP PGUID 12 1 100 0 0 f f f f f t s 2 0 25 "23 23" _null_ _null_  _null_  _null_ _null_ pg_cmdstatus _null_ _null_ _null_ ));
+DESCR("returns information about another process");
 DATA(insert OID = 2171 ( pg_cancel_backend		PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_cancel_backend _null_ _null_ _null_ ));
 DESCR("cancel a server process' current query");
 DATA(insert OID = 2096 ( pg_terminate_backend		PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index e0cc69f..5f03f63 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -80,6 +80,7 @@
 extern PGDLLIMPORT volatile bool InterruptPending;
 extern PGDLLIMPORT volatile bool QueryCancelPending;
 extern PGDLLIMPORT volatile bool ProcDiePending;
+extern PGDLLIMPORT volatile bool CmdStatusInfoPending;
 
 extern volatile bool ClientConnectionLost;
 
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index af1a0cd..cd856e5 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -41,6 +41,8 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
+	PROCSIG_CMD_STATUS_INFO,
+
 	NUM_PROCSIGNALS				/* Must be last! */
 } ProcSignalReason;
 
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index fc1679e..605612e 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1243,6 +1243,9 @@ extern Datum pg_identify_object_as_address(PG_FUNCTION_ARGS);
 /* catalog/objectaddress.c */
 extern Datum pg_get_object_address(PG_FUNCTION_ARGS);
 
+/* utils/adt/cmdstatus.c */
+extern Datum pg_cmdstatus(PG_FUNCTION_ARGS);
+
 /* commands/constraint.c */
 extern Datum unique_key_recheck(PG_FUNCTION_ARGS);
 
diff --git a/src/include/utils/cmdstatus.h b/src/include/utils/cmdstatus.h
new file mode 100644
index 0000000..85627a57
--- /dev/null
+++ b/src/include/utils/cmdstatus.h
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ *
+ * cmdstatus.h
+ *	  Declarations for command status interrupt handling.
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ * src/include/utils/cmdstatus.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CMDSTATUS_H
+#define CMDSTATUS_H
+
+extern Size CmdStatusShmemSize(void);
+extern void CmdStatusShmemInit(void);
+extern void CmdStatusInit(int css_idx);
+
+extern void HandleCmdStatusInfoInterrupt(void);
+extern void ProcessCmdStatusInfoRequest(void);
+
+#endif   /* CMDSTATUS_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to