On Mon, Sep 14, 2015 at 7:27 PM, Pavel Stehule <pavel.steh...@gmail.com>
wrote:

>
> 2015-09-14 18:46 GMT+02:00 Shulgin, Oleksandr <
> oleksandr.shul...@zalando.de>:
>
>>
>> I have a radical proposal to remove the need for locking: make the
>> CmdStatusSlot struct consist of a mere dsm_handle and move all the required
>> metadata like sender_pid, request_type, etc. into the shared memory segment
>> itself.
>>
>> If we allow the only the requesting process to update the slot (that is
>> the handle value itself) this removes the need for locking between sender
>> and receiver.
>>
>> The sender will walk through the slots looking for a non-zero dsm handle
>> (according to dsm_create() implementation 0 is considered an invalid
>> handle), and if it finds a valid one, it will attach and look inside, to
>> check if it's destined for this process ID.  At first that might sound
>> strange, but I would expect 99% of the time that the only valid slot would
>> be for the process that has been just signaled.
>>
>> The sender process will then calculate the response message, update the
>> result_code in the shared memory segment and finally send the message
>> through the queue.  If the receiver has since detached we get a detached
>> result code and bail out.
>>
>> Clearing the slot after receiving the message should be the requesting
>> process' responsibility.  This way the receiver only writes to the slot and
>> the sender only reads from it.
>>
>> By the way, is it safe to assume atomic read/writes of dsm_handle
>> (uint32)?  I would be surprised if not.
>>
>
> I don't see any reason why it should not to work - only few processes will
> wait for data - so lost attach/detach shm operations will not be too much.
>

Please see attached for implementation of this approach.  The most
surprising thing is that it actually works :)

One problem still remains with the process requesting information when the
target process exits before it can have a chance to handle the signal.  The
requesting process then waits forever, because nobody attaches/detaches the
queue.  We've discussed this before and looks like we need to introduce a
timeout parameter to pg_cmdstatus()...

--
Alex
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 32ac58f..2e3beaf 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -43,6 +43,7 @@
 #include "storage/procsignal.h"
 #include "storage/sinvaladt.h"
 #include "storage/spin.h"
+#include "utils/cmdstatus.h"
 
 
 shmem_startup_hook_type shmem_startup_hook = NULL;
@@ -139,6 +140,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 		size = add_size(size, BTreeShmemSize());
 		size = add_size(size, SyncScanShmemSize());
 		size = add_size(size, AsyncShmemSize());
+		size = add_size(size, CmdStatusShmemSize());
 #ifdef EXEC_BACKEND
 		size = add_size(size, ShmemBackendArraySize());
 #endif
@@ -243,6 +245,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
 	ReplicationOriginShmemInit();
 	WalSndShmemInit();
 	WalRcvShmemInit();
+	CmdStatusShmemInit();
 
 	/*
 	 * Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 0abde43..e637be1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -26,6 +26,7 @@
 #include "storage/shmem.h"
 #include "storage/sinval.h"
 #include "tcop/tcopprot.h"
+#include "utils/cmdstatus.h"
 
 
 /*
@@ -296,6 +297,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
 	if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
 		RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
 
+	if (CheckProcSignal(PROCSIG_CMD_STATUS_INFO))
+		HandleCmdStatusInfoInterrupt();
+
 	if (set_latch_on_sigusr1)
 		SetLatch(MyLatch);
 
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index d917af3..1a5b03c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -67,6 +67,7 @@
 #include "tcop/pquery.h"
 #include "tcop/tcopprot.h"
 #include "tcop/utility.h"
+#include "utils/cmdstatus.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
@@ -2991,6 +2992,9 @@ ProcessInterrupts(void)
 
 	if (ParallelMessagePending)
 		HandleParallelMessages();
+
+	if (CmdStatusInfoPending)
+		ProcessCmdStatusInfoRequest();
 }
 
 
diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile
index 3ed0b44..2c8687c 100644
--- a/src/backend/utils/adt/Makefile
+++ b/src/backend/utils/adt/Makefile
@@ -18,7 +18,7 @@ endif
 # keep this list arranged alphabetically or it gets to be a mess
 OBJS = acl.o arrayfuncs.o array_expanded.o array_selfuncs.o \
 	array_typanalyze.o array_userfuncs.o arrayutils.o ascii.o \
-	bool.o cash.o char.o date.o datetime.o datum.o dbsize.o domains.o \
+	bool.o cash.o char.o cmdstatus.o date.o datetime.o datum.o dbsize.o domains.o \
 	encode.o enum.o expandeddatum.o \
 	float.o format_type.o formatting.o genfile.o \
 	geo_ops.o geo_selfuncs.o inet_cidr_ntop.o inet_net_pton.o int.o \
diff --git a/src/backend/utils/adt/cmdstatus.c b/src/backend/utils/adt/cmdstatus.c
new file mode 100644
index 0000000..6cb04f4
--- /dev/null
+++ b/src/backend/utils/adt/cmdstatus.c
@@ -0,0 +1,675 @@
+/*-------------------------------------------------------------------------
+ *
+ * cmdstatus.c
+ *	  Definitions for pg_cmdstatus function.
+ *
+ * Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ *	  src/backend/utils/adt/cmdstatus.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/htup_details.h"
+#include "commands/explain.h"
+#include "lib/stringinfo.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/dest.h"
+#include "tcop/pquery.h"
+#include "utils/builtins.h"
+#include "utils/cmdstatus.h"
+
+
+typedef enum {
+	CMD_STATUS_REQUEST_EXPLAIN = 1,
+	CMD_STATUS_REQUEST_QUERY_TEXT = 2,
+	CMD_STATUS_REQUEST_PROGRESS_TAG = 3,
+	CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE = 4
+} CmdStatusInfoRequestType;
+
+#define CMD_STATUS_MAX_REQUEST CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE
+
+typedef enum {
+	CMD_STATUS_RESULT_FAILURE = -1,
+	CMD_STATUS_RESULT_SUCCESS = 0,
+	CMD_STATUS_RESULT_BACKEND_IDLE,
+	CMD_STATUS_RESULT_NO_DATA
+} CmdStatusInfoResultCode;
+
+/*
+ * We allocate an array the size of NumCmdStatusSlots of DSM handles in the
+ * shared memory.  This slots array is indexed by backend ID, so each slot can
+ * corresponds to at max. one backend at any given time.
+ *
+ * In order to actually query some backend for its status, the interested
+ * process will create a DSM segment, initialize a table of contents in it,
+ * fill the target process id and request type and create a shared memory
+ * queue in the segment.
+ *
+ * The interested process will then put the such initialized DSM segment's
+ * handle in its own slot and send the SIGUSR1 with the reason
+ * PROCSIG_CMD_STATUS_INFO to the process being queried.  The interested
+ * process will wait for the response to be delivered via the shared memory
+ * queue and the result code via a field in the DSM segment.
+ *
+ * The layout of a DSM segment is given by the following TOC keys enumeration:
+ */
+typedef enum {
+	CmdStatusSlotKeyTargetProcessID,
+	CmdStatusSlotKeyIsProcessedFlag,
+	CmdStatusSlotKeyRequestType,
+	CmdStatusSlotKeyResultCode,
+	CmdStatusSlotKeyMemoryQueue,
+
+	CmdStatusTotalSlotKeys
+} CmdStatusSlotKey;
+
+#define NumCmdStatusSlots	MaxBackends
+
+#define PG_CMD_STATUS_INFO_MAGIC	0x79fb2449
+#define CMD_STATUS_BUFFER_SIZE		1024
+
+/*
+ * These structs are allocated on the program stack as local variables in the
+ * ExecutorRun hook.  The top of stack is current_query_stack, see below.
+ *
+ * Capturing the execution stack allows us to inspect the inner-most running
+ * query as well as showing the complete backtrace.
+ */
+typedef struct CmdInfoStack {
+	QueryDesc			   *query_desc;
+	struct CmdInfoStack	   *parent;
+} CmdInfoStack;
+
+/* The array of slots pre-allocated on shared memory. */
+static volatile dsm_handle *CmdStatusSlots = NULL;
+
+/* Pointer into the slots array, to a slot assigned for this backend. */
+static volatile dsm_handle *MyCmdStatusSlot = NULL;
+
+static CmdInfoStack	   *current_query_stack = NULL;
+static int				query_stack_size = 0;  /* XXX not really used */
+
+static ExecutorRun_hook_type	prev_ExecutorRun = NULL;
+
+static void cmdstatus_ExecutorRun(QueryDesc *queryDesc,
+								  ScanDirection direction, long count);
+
+static void CleanupCmdStatusSlot(int status, Datum arg);
+static StringInfo ProcessCmdStatusSlot(CmdStatusInfoRequestType request_type,
+					 CmdStatusInfoResultCode *result_code);
+
+static StringInfo explain_query(QueryDesc *queryDesc);
+static void report_result_code(CmdStatusInfoResultCode result, pid_t target_pid);
+static void RestoreResourceOwner(ResourceOwner prevResourceOwner);
+
+
+Size
+CmdStatusShmemSize(void)
+{
+	return NumCmdStatusSlots * sizeof(dsm_handle);
+}
+
+void
+CmdStatusShmemInit(void)
+{
+	Size		size = CmdStatusShmemSize();
+	bool		found;
+
+	CmdStatusSlots = (dsm_handle *) ShmemInitStruct("CmdStatusSlots", size, &found);
+
+	if (!found)
+		MemSet(CmdStatusSlots, 0, size);
+}
+
+static void
+CleanupCmdStatusSlot(int status, Datum arg)
+{
+	Assert(MyCmdStatusSlot != NULL);
+
+	*MyCmdStatusSlot = 0;	/* just in case */
+
+	MyCmdStatusSlot = NULL;
+}
+
+/*
+ * CmdStatusInit
+ *		Initialize MyCmdStatusSlot
+ *
+ * The passed index should be MyBackendId.
+ */
+void
+CmdStatusInit(int css_idx)
+{
+	MyCmdStatusSlot = &CmdStatusSlots[css_idx - 1];
+
+	on_shmem_exit(CleanupCmdStatusSlot, (Datum) 0);
+
+	/* also install executor hooks */
+	prev_ExecutorRun = ExecutorRun_hook;
+	ExecutorRun_hook = cmdstatus_ExecutorRun;
+}
+
+/*
+ * The executor hook.
+ *
+ * Accumulates the query descriptors on the program stack and takes care of
+ * popping the current frame when leaving the function ab-/normally.
+ */
+static void
+cmdstatus_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+{
+	CmdInfoStack	current;
+
+	current.query_desc = queryDesc;
+	current.parent = current_query_stack;
+
+	current_query_stack = &current;
+	query_stack_size++;
+
+	PG_TRY();
+	{
+		if (prev_ExecutorRun)
+			prev_ExecutorRun(queryDesc, direction, count);
+		else
+			standard_ExecutorRun(queryDesc, direction, count);
+
+		Assert(current_query_stack == &current);
+		Assert(query_stack_size > 0);
+
+		query_stack_size--;
+		current_query_stack = current.parent;
+	}
+	PG_CATCH();
+	{
+		Assert(current_query_stack == &current);
+		Assert(query_stack_size > 0);
+
+		query_stack_size--;
+		current_query_stack = current.parent;
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+}
+
+/* Produce an explain plan of a single query descriptor. */
+static StringInfo
+explain_query(QueryDesc *queryDesc)
+{
+	StringInfo		str;
+	ExplainState   *es;
+
+	es = NewExplainState();
+	es->analyze = false;
+	es->verbose = false;
+	es->buffers = false;
+	es->format = EXPLAIN_FORMAT_TEXT;
+
+	ExplainBeginOutput(es);
+	/* XXX: appendStringInfo(es->str, "#%d ", depth); ? */
+	ExplainQueryText(es, queryDesc);
+	ExplainPrintPlan(es, queryDesc);
+	ExplainEndOutput(es);
+
+	str = es->str;
+
+	pfree(es);
+	return str;
+}
+
+
+/* signal handler for PROCSIG_CMD_STATUS_INFO */
+void
+HandleCmdStatusInfoInterrupt(void)
+{
+	CmdStatusInfoPending = true;
+	InterruptPending = true;
+
+	SetLatch(MyLatch);
+}
+
+/* Produce a response and set the result code for a single slot. */
+static StringInfo
+ProcessCmdStatusSlot(CmdStatusInfoRequestType request_type,
+					 CmdStatusInfoResultCode *result_code)
+{
+	StringInfo		result = NULL;
+
+	/* Show some optimism, overwrite with error code later if needed. */
+	*result_code = CMD_STATUS_RESULT_SUCCESS;
+
+	if (ActivePortal)
+	{
+		switch (request_type)
+		{
+			case CMD_STATUS_REQUEST_EXPLAIN:
+				if (ActivePortal->queryDesc != NULL)
+					result = explain_query(ActivePortal->queryDesc);
+				else if (current_query_stack != NULL)
+					result = explain_query(current_query_stack->query_desc);
+				else
+					*result_code = CMD_STATUS_RESULT_NO_DATA;
+				break;
+
+			case CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE:
+				{
+					StringInfo		str;
+					CmdInfoStack   *query;
+
+					result = makeStringInfo();
+
+					if (current_query_stack != NULL)
+					{
+						for (query = current_query_stack;
+							 query != NULL;
+							 query = query->parent)
+						{
+							str = explain_query(query->query_desc);
+							appendBinaryStringInfo(result, str->data, str->len);
+
+							pfree(str->data);
+							pfree(str);
+						}
+					}
+					else
+						*result_code = CMD_STATUS_RESULT_BACKEND_IDLE;
+					break;
+				}
+
+			case CMD_STATUS_REQUEST_QUERY_TEXT:
+				result = makeStringInfo();
+				appendStringInfoString(result, ActivePortal->sourceText);
+				break;
+
+			case CMD_STATUS_REQUEST_PROGRESS_TAG:
+				if (ActivePortal->commandTag != NULL)
+				{
+					result = makeStringInfo();
+
+					if (ActivePortal->queryDesc != NULL &&
+						ActivePortal->queryDesc->estate != NULL)
+					{
+						appendStringInfo(result, "%s %u",
+										 ActivePortal->commandTag,
+										 ActivePortal->queryDesc->estate->es_processed);
+					}
+					else
+					{
+						/* no progress available, at least show the command tag */
+						appendStringInfoString(result, ActivePortal->commandTag);
+					}
+				}
+				else
+					*result_code = CMD_STATUS_RESULT_NO_DATA;
+				break;
+		}
+	}
+	else
+		*result_code = CMD_STATUS_RESULT_BACKEND_IDLE;
+
+	return result;
+}
+
+static void
+RestoreResourceOwner(ResourceOwner prevResourceOwner)
+{
+	if (prevResourceOwner != CurrentResourceOwner)
+	{
+		ResourceOwner	res_owner = CurrentResourceOwner;
+
+		CurrentResourceOwner = prevResourceOwner;
+		ResourceOwnerDelete(res_owner);
+	}
+}
+
+/*
+ * Process all command status requests from other backends by scanning the
+ * whole array of slots looking for the requests targeted at us, which were
+ * not already processed by us during earlier rounds.
+ *
+ * Most of the times it is expected that the first valid slot we find is for
+ * us, so there's no penalty for actually attaching it and looking inside.
+ */
+void
+ProcessCmdStatusInfoRequest(void)
+{
+	ResourceOwner	prevResourceOwner = CurrentResourceOwner;
+
+	/* Ensure valid resource owner for access to dsm. */
+	if (CurrentResourceOwner == NULL)
+		CurrentResourceOwner = ResourceOwnerCreate(NULL, "ProcessCmdStatusInfoRequest");
+
+	/*
+	 * We might have been signaled again by another process while handling a
+	 * request, so re-check after going through all the slots.
+	 */
+	while (CmdStatusInfoPending)
+	{
+		int		i;
+
+		CmdStatusInfoPending = false;
+
+		for (i = 0; i < NumCmdStatusSlots; i++)
+		{
+			dsm_handle slot = CmdStatusSlots[i];
+
+			if (slot != 0)
+			{
+				dsm_segment	   *seg = NULL;
+				shm_toc		   *toc = NULL;
+				pid_t		   *target_pid_ptr;
+				bool		   *is_processed_flag_ptr;
+				CmdStatusInfoRequestType   *request_type_ptr;
+				CmdStatusInfoResultCode	   *result_code_ptr;
+				shm_mq		   *mq;
+				shm_mq_handle  *output;
+
+				seg = dsm_attach(slot);
+				if (seg == NULL)
+				{
+					elog(LOG, "unable to map dynamic memory segment for command status");
+					continue;
+				}
+
+				toc = shm_toc_attach(PG_CMD_STATUS_INFO_MAGIC, dsm_segment_address(seg));
+				if (toc == NULL)
+				{
+					elog(LOG, "bad magic in dynamic memory segment for command status");
+
+					dsm_detach(seg);
+					continue;
+				}
+
+				target_pid_ptr = (pid_t *)
+					shm_toc_lookup(toc, CmdStatusSlotKeyTargetProcessID);
+				is_processed_flag_ptr = (bool *)
+					shm_toc_lookup(toc, CmdStatusSlotKeyIsProcessedFlag);
+
+				if (*target_pid_ptr != MyProcPid || *is_processed_flag_ptr)
+				{
+					dsm_detach(seg);
+					continue;
+				}
+
+				/* Set the flag early to avoid re-processing during next round. */
+				*is_processed_flag_ptr = true;
+
+				request_type_ptr = (CmdStatusInfoRequestType *)
+					shm_toc_lookup(toc, CmdStatusSlotKeyRequestType);
+
+				result_code_ptr = (CmdStatusInfoResultCode *)
+					shm_toc_lookup(toc, CmdStatusSlotKeyResultCode);
+
+				mq = shm_toc_lookup(toc, CmdStatusSlotKeyMemoryQueue);
+				output = shm_mq_attach(mq, seg, NULL);
+
+				PG_TRY();
+				{
+					StringInfo		payload;
+					int				res;
+
+					payload = ProcessCmdStatusSlot(*request_type_ptr, result_code_ptr);
+					if (payload != NULL)
+					{
+						/*
+						 * Send the data, including null terminator.
+						 *
+						 * This blocks until the receiving side completes
+						 * reading or detaches from the queue.
+						 */
+						res = shm_mq_send(output, payload->len + 1, payload->data, false);
+						if (res != SHM_MQ_SUCCESS)
+							elog(LOG, "cannot send command status via shared memory queue");
+
+						pfree(payload->data);
+						pfree(payload);
+					}
+					/*
+					 * If there was no payload to send, we just detach from
+					 * the queue and the receiver should wake up.
+					 */
+
+					shm_mq_detach(mq);
+					dsm_detach(seg);
+				}
+				PG_CATCH();
+				{
+					/*
+					 * Detaching from the queue also wakes up the receiver, be
+					 * sure to update the slot status before detaching.
+					 */
+					shm_mq_detach(mq);
+					dsm_detach(seg);
+
+					RestoreResourceOwner(prevResourceOwner);	/* XXX remove if not re-throwing */
+
+					PG_RE_THROW();	/* XXX is it better to log/suppress the error here? */
+				}
+				PG_END_TRY();
+			}
+		}
+	}
+
+	RestoreResourceOwner(prevResourceOwner);
+}
+
+static void
+report_result_code(CmdStatusInfoResultCode result, pid_t target_pid)
+{
+	switch (result)
+	{
+		case CMD_STATUS_RESULT_BACKEND_IDLE:
+			elog(INFO, "no command is currently running in backend with PID %d",
+				 target_pid);
+			break;
+
+		case CMD_STATUS_RESULT_NO_DATA:
+			elog(WARNING, "no suitable data found for the query in backend with PID %d",
+				 target_pid);
+			break;
+
+		default:
+			elog(ERROR, "general command status request failure when querying backend with PID %d",
+				 target_pid);
+			break;
+	}
+}
+
+/*
+ * Try to get status of a command running in another process.
+ *
+ * FUNCTION pg_cmdstatus(pid, request_type)
+ * RETURNS SETOF text
+ */
+Datum
+pg_cmdstatus(PG_FUNCTION_ARGS)
+{
+	ReturnSetInfo	   *rsinfo	= (ReturnSetInfo *) fcinfo->resultinfo;
+	Tuplestorestate	   *tupstore;
+	TupleDesc			tupdesc;
+	MemoryContext		per_query_ctx;
+	MemoryContext		oldcontext;
+	pid_t				target_pid = (pid_t) PG_GETARG_INT32(0);
+	int					request_type = PG_GETARG_INT32(1);
+	PGPROC			   *target_proc;
+	shm_toc_estimator	estimator;
+	Size				segsize;
+	dsm_segment		   *seg;
+	shm_toc			   *toc;
+	pid_t			   *target_pid_ptr;
+	bool			   *is_processed_flag_ptr;
+	CmdStatusInfoRequestType   *request_type_ptr;
+	CmdStatusInfoResultCode	   *result_code_ptr;
+	shm_mq			   *mq;
+	shm_mq_handle	   *input;
+
+	/* check to see if caller supports us returning a tuplestore */
+	if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("set-valued function called in context that cannot accept a set")));
+
+	if (!(rsinfo->allowedModes & SFRM_Materialize))
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("materialize mode required, but it is not allowed in this context")));
+
+	if (request_type < 1 || request_type > CMD_STATUS_MAX_REQUEST)
+		ereport(ERROR,
+				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+				 errmsg("unknown command status request")));
+
+	if (target_pid == MyProcPid)
+		ereport(ERROR,
+				(errmsg("backend cannot query command status of itself")));
+
+	/* verify access to target_pid */
+	target_proc = BackendPidGetProc(target_pid);
+
+	if (target_proc == NULL)
+		ereport(ERROR,
+				(errmsg("PID %d is not a PostgreSQL server process", target_pid)));
+
+	if (!(superuser() || target_proc->roleId == GetUserId()))
+		ereport(ERROR,
+				(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+				 (errmsg("must be superuser or have the same role to explain queries running in other server processes"))));
+
+	if (*MyCmdStatusSlot != 0)
+		ereport(ERROR,
+				(errmsg("pg_cmdstatus() call is already in progress")));
+
+	/* need to build tuplestore in query context */
+	per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+	oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+	tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+	tupstore = tuplestore_begin_heap(false, false, work_mem);
+	MemoryContextSwitchTo(oldcontext);
+
+	/* prepare shared dsm segment */
+	shm_toc_initialize_estimator(&estimator);
+	shm_toc_estimate_keys(&estimator, CmdStatusTotalSlotKeys);
+	shm_toc_estimate_chunk(&estimator, sizeof(pid_t));
+	shm_toc_estimate_chunk(&estimator, sizeof(bool));
+	shm_toc_estimate_chunk(&estimator, sizeof(CmdStatusInfoRequestType));
+	shm_toc_estimate_chunk(&estimator, sizeof(CmdStatusInfoResultCode));
+	shm_toc_estimate_chunk(&estimator, CMD_STATUS_BUFFER_SIZE);
+	segsize = shm_toc_estimate(&estimator);
+
+	seg = dsm_create(segsize, 0);
+
+	toc = shm_toc_create(PG_CMD_STATUS_INFO_MAGIC, dsm_segment_address(seg), segsize);
+
+	target_pid_ptr = (pid_t *) shm_toc_allocate(toc, sizeof(pid_t));
+	*target_pid_ptr = target_pid;
+	shm_toc_insert(toc, CmdStatusSlotKeyTargetProcessID, target_pid_ptr);
+
+	is_processed_flag_ptr = (bool *) shm_toc_allocate(toc, sizeof(bool));
+	*is_processed_flag_ptr = false;
+	shm_toc_insert(toc, CmdStatusSlotKeyIsProcessedFlag, is_processed_flag_ptr);
+
+	request_type_ptr = (CmdStatusInfoRequestType *)
+		shm_toc_allocate(toc, sizeof(CmdStatusInfoRequestType));
+	*request_type_ptr = request_type;
+	shm_toc_insert(toc, CmdStatusSlotKeyRequestType, request_type_ptr);
+
+	result_code_ptr = (CmdStatusInfoResultCode *)
+		shm_toc_allocate(toc, sizeof(CmdStatusInfoResultCode));
+	*result_code_ptr = CMD_STATUS_RESULT_FAILURE;
+	shm_toc_insert(toc, CmdStatusSlotKeyResultCode, result_code_ptr);
+
+	mq = shm_mq_create(shm_toc_allocate(toc, CMD_STATUS_BUFFER_SIZE),
+					   CMD_STATUS_BUFFER_SIZE);
+	shm_toc_insert(toc, CmdStatusSlotKeyMemoryQueue, mq);
+
+	shm_mq_set_receiver(mq, MyProc);
+	shm_mq_set_sender(mq, target_proc);
+
+	PG_TRY();
+	{
+		shm_mq_result	res;
+		char		   *data = NULL;
+		Size			length;
+
+		input = shm_mq_attach(mq, seg, NULL);
+
+		*MyCmdStatusSlot = dsm_segment_handle(seg);
+
+		if (SendProcSignal(target_pid, PROCSIG_CMD_STATUS_INFO, target_proc->backendId))
+			elog(ERROR, "could not signal backend with PID %d", target_pid);
+
+		/*
+		 * This blocks until the sender process writes a message to the queue
+		 * or detaches from it.  In either case we wake up and process the
+		 * result.
+		 */
+		res = shm_mq_receive(input, &length, (void **) &data, false);
+		if (res == SHM_MQ_SUCCESS)
+		{
+			const char	   *p = data;
+
+			/* break into tuples on newline */
+			while (*p)
+			{
+				const char *q = strchr(p, '\n');
+				Size		len = q ? q - p : strlen(p);
+				Datum		value;
+				HeapTuple	tuple;
+				bool		isnull = false;
+
+				value = PointerGetDatum(cstring_to_text_with_len(p, len));
+
+				tuple = heap_form_tuple(tupdesc, &value, &isnull);
+				tuplestore_puttuple(tupstore, tuple);
+
+				if (!q)
+					break;
+
+				p += len + 1;
+			}
+		}
+		else
+		{
+			report_result_code(*result_code_ptr, target_pid);
+		}
+
+		/* clean up and return the tuplestore */
+		tuplestore_donestoring(tupstore);
+
+		*MyCmdStatusSlot = 0;
+
+		shm_mq_detach(mq);
+		dsm_detach(seg);
+	}
+	PG_CATCH();
+	{
+		*MyCmdStatusSlot = 0;
+
+		shm_mq_detach(mq);
+		dsm_detach(seg);
+
+		PG_RE_THROW();
+	}
+	PG_END_TRY();
+
+	rsinfo->returnMode = SFRM_Materialize;
+	rsinfo->setResult = tupstore;
+	rsinfo->setDesc = tupdesc;
+
+	return (Datum) 0;
+}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 23e594e..2269853 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -29,6 +29,7 @@ ProtocolVersion FrontendProtocol;
 volatile bool InterruptPending = false;
 volatile bool QueryCancelPending = false;
 volatile bool ProcDiePending = false;
+volatile bool CmdStatusInfoPending = false;
 volatile bool ClientConnectionLost = false;
 volatile uint32 InterruptHoldoffCount = 0;
 volatile uint32 QueryCancelHoldoffCount = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 7b19714..7e7ba77 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -50,6 +50,7 @@
 #include "storage/smgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
+#include "utils/cmdstatus.h"
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/memutils.h"
@@ -588,6 +589,9 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
 	/* Now that we have a BackendId, we can participate in ProcSignal */
 	ProcSignalInit(MyBackendId);
 
+	/* Init CmdStatus slot */
+	CmdStatusInit(MyBackendId);
+
 	/*
 	 * Also set up timeout handlers needed for backend operation.  We need
 	 * these in every case except bootstrap.
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index ddf7c67..d083aaf 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3130,6 +3130,8 @@ DESCR("get OID of current session's temp schema, if any");
 DATA(insert OID = 2855 (  pg_is_other_temp_schema	PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ pg_is_other_temp_schema _null_ _null_ _null_ ));
 DESCR("is schema another session's temp schema?");
 
+DATA(insert OID = 4099 ( pg_cmdstatus		PGNSP PGUID 12 1 100 0 0 f f f f f t s 2 0 25 "23 23" _null_ _null_  _null_  _null_ _null_ pg_cmdstatus _null_ _null_ _null_ ));
+DESCR("returns information about another process");
 DATA(insert OID = 2171 ( pg_cancel_backend		PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_cancel_backend _null_ _null_ _null_ ));
 DESCR("cancel a server process' current query");
 DATA(insert OID = 2096 ( pg_terminate_backend		PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index e0cc69f..5f03f63 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -80,6 +80,7 @@
 extern PGDLLIMPORT volatile bool InterruptPending;
 extern PGDLLIMPORT volatile bool QueryCancelPending;
 extern PGDLLIMPORT volatile bool ProcDiePending;
+extern PGDLLIMPORT volatile bool CmdStatusInfoPending;
 
 extern volatile bool ClientConnectionLost;
 
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index af1a0cd..cd856e5 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -41,6 +41,8 @@ typedef enum
 	PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
 	PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
 
+	PROCSIG_CMD_STATUS_INFO,
+
 	NUM_PROCSIGNALS				/* Must be last! */
 } ProcSignalReason;
 
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index fc1679e..605612e 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1243,6 +1243,9 @@ extern Datum pg_identify_object_as_address(PG_FUNCTION_ARGS);
 /* catalog/objectaddress.c */
 extern Datum pg_get_object_address(PG_FUNCTION_ARGS);
 
+/* utils/adt/cmdstatus.c */
+extern Datum pg_cmdstatus(PG_FUNCTION_ARGS);
+
 /* commands/constraint.c */
 extern Datum unique_key_recheck(PG_FUNCTION_ARGS);
 
diff --git a/src/include/utils/cmdstatus.h b/src/include/utils/cmdstatus.h
new file mode 100644
index 0000000..85627a57
--- /dev/null
+++ b/src/include/utils/cmdstatus.h
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ *
+ * cmdstatus.h
+ *	  Declarations for command status interrupt handling.
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ * src/include/utils/cmdstatus.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CMDSTATUS_H
+#define CMDSTATUS_H
+
+extern Size CmdStatusShmemSize(void);
+extern void CmdStatusShmemInit(void);
+extern void CmdStatusInit(int css_idx);
+
+extern void HandleCmdStatusInfoInterrupt(void);
+extern void ProcessCmdStatusInfoRequest(void);
+
+#endif   /* CMDSTATUS_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to