On Mon, Sep 14, 2015 at 7:27 PM, Pavel Stehule <[email protected]>
wrote:
>
> 2015-09-14 18:46 GMT+02:00 Shulgin, Oleksandr <
> [email protected]>:
>
>>
>> I have a radical proposal to remove the need for locking: make the
>> CmdStatusSlot struct consist of a mere dsm_handle and move all the required
>> metadata like sender_pid, request_type, etc. into the shared memory segment
>> itself.
>>
>> If we allow the only the requesting process to update the slot (that is
>> the handle value itself) this removes the need for locking between sender
>> and receiver.
>>
>> The sender will walk through the slots looking for a non-zero dsm handle
>> (according to dsm_create() implementation 0 is considered an invalid
>> handle), and if it finds a valid one, it will attach and look inside, to
>> check if it's destined for this process ID. At first that might sound
>> strange, but I would expect 99% of the time that the only valid slot would
>> be for the process that has been just signaled.
>>
>> The sender process will then calculate the response message, update the
>> result_code in the shared memory segment and finally send the message
>> through the queue. If the receiver has since detached we get a detached
>> result code and bail out.
>>
>> Clearing the slot after receiving the message should be the requesting
>> process' responsibility. This way the receiver only writes to the slot and
>> the sender only reads from it.
>>
>> By the way, is it safe to assume atomic read/writes of dsm_handle
>> (uint32)? I would be surprised if not.
>>
>
> I don't see any reason why it should not to work - only few processes will
> wait for data - so lost attach/detach shm operations will not be too much.
>
Please see attached for implementation of this approach. The most
surprising thing is that it actually works :)
One problem still remains with the process requesting information when the
target process exits before it can have a chance to handle the signal. The
requesting process then waits forever, because nobody attaches/detaches the
queue. We've discussed this before and looks like we need to introduce a
timeout parameter to pg_cmdstatus()...
--
Alex
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 32ac58f..2e3beaf 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -43,6 +43,7 @@
#include "storage/procsignal.h"
#include "storage/sinvaladt.h"
#include "storage/spin.h"
+#include "utils/cmdstatus.h"
shmem_startup_hook_type shmem_startup_hook = NULL;
@@ -139,6 +140,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, BTreeShmemSize());
size = add_size(size, SyncScanShmemSize());
size = add_size(size, AsyncShmemSize());
+ size = add_size(size, CmdStatusShmemSize());
#ifdef EXEC_BACKEND
size = add_size(size, ShmemBackendArraySize());
#endif
@@ -243,6 +245,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
ReplicationOriginShmemInit();
WalSndShmemInit();
WalRcvShmemInit();
+ CmdStatusShmemInit();
/*
* Set up other modules that need some shared memory space
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c
index 0abde43..e637be1 100644
--- a/src/backend/storage/ipc/procsignal.c
+++ b/src/backend/storage/ipc/procsignal.c
@@ -26,6 +26,7 @@
#include "storage/shmem.h"
#include "storage/sinval.h"
#include "tcop/tcopprot.h"
+#include "utils/cmdstatus.h"
/*
@@ -296,6 +297,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS)
if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN))
RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN);
+ if (CheckProcSignal(PROCSIG_CMD_STATUS_INFO))
+ HandleCmdStatusInfoInterrupt();
+
if (set_latch_on_sigusr1)
SetLatch(MyLatch);
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index d917af3..1a5b03c 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -67,6 +67,7 @@
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
#include "tcop/utility.h"
+#include "utils/cmdstatus.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/ps_status.h"
@@ -2991,6 +2992,9 @@ ProcessInterrupts(void)
if (ParallelMessagePending)
HandleParallelMessages();
+
+ if (CmdStatusInfoPending)
+ ProcessCmdStatusInfoRequest();
}
diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile
index 3ed0b44..2c8687c 100644
--- a/src/backend/utils/adt/Makefile
+++ b/src/backend/utils/adt/Makefile
@@ -18,7 +18,7 @@ endif
# keep this list arranged alphabetically or it gets to be a mess
OBJS = acl.o arrayfuncs.o array_expanded.o array_selfuncs.o \
array_typanalyze.o array_userfuncs.o arrayutils.o ascii.o \
- bool.o cash.o char.o date.o datetime.o datum.o dbsize.o domains.o \
+ bool.o cash.o char.o cmdstatus.o date.o datetime.o datum.o dbsize.o domains.o \
encode.o enum.o expandeddatum.o \
float.o format_type.o formatting.o genfile.o \
geo_ops.o geo_selfuncs.o inet_cidr_ntop.o inet_net_pton.o int.o \
diff --git a/src/backend/utils/adt/cmdstatus.c b/src/backend/utils/adt/cmdstatus.c
new file mode 100644
index 0000000..6cb04f4
--- /dev/null
+++ b/src/backend/utils/adt/cmdstatus.c
@@ -0,0 +1,675 @@
+/*-------------------------------------------------------------------------
+ *
+ * cmdstatus.c
+ * Definitions for pg_cmdstatus function.
+ *
+ * Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ * IDENTIFICATION
+ * src/backend/utils/adt/cmdstatus.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "funcapi.h"
+#include "miscadmin.h"
+
+#include "access/htup_details.h"
+#include "commands/explain.h"
+#include "lib/stringinfo.h"
+#include "storage/dsm.h"
+#include "storage/ipc.h"
+#include "storage/latch.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "storage/procsignal.h"
+#include "storage/shm_mq.h"
+#include "storage/shm_toc.h"
+#include "tcop/dest.h"
+#include "tcop/pquery.h"
+#include "utils/builtins.h"
+#include "utils/cmdstatus.h"
+
+
+typedef enum {
+ CMD_STATUS_REQUEST_EXPLAIN = 1,
+ CMD_STATUS_REQUEST_QUERY_TEXT = 2,
+ CMD_STATUS_REQUEST_PROGRESS_TAG = 3,
+ CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE = 4
+} CmdStatusInfoRequestType;
+
+#define CMD_STATUS_MAX_REQUEST CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE
+
+typedef enum {
+ CMD_STATUS_RESULT_FAILURE = -1,
+ CMD_STATUS_RESULT_SUCCESS = 0,
+ CMD_STATUS_RESULT_BACKEND_IDLE,
+ CMD_STATUS_RESULT_NO_DATA
+} CmdStatusInfoResultCode;
+
+/*
+ * We allocate an array the size of NumCmdStatusSlots of DSM handles in the
+ * shared memory. This slots array is indexed by backend ID, so each slot can
+ * corresponds to at max. one backend at any given time.
+ *
+ * In order to actually query some backend for its status, the interested
+ * process will create a DSM segment, initialize a table of contents in it,
+ * fill the target process id and request type and create a shared memory
+ * queue in the segment.
+ *
+ * The interested process will then put the such initialized DSM segment's
+ * handle in its own slot and send the SIGUSR1 with the reason
+ * PROCSIG_CMD_STATUS_INFO to the process being queried. The interested
+ * process will wait for the response to be delivered via the shared memory
+ * queue and the result code via a field in the DSM segment.
+ *
+ * The layout of a DSM segment is given by the following TOC keys enumeration:
+ */
+typedef enum {
+ CmdStatusSlotKeyTargetProcessID,
+ CmdStatusSlotKeyIsProcessedFlag,
+ CmdStatusSlotKeyRequestType,
+ CmdStatusSlotKeyResultCode,
+ CmdStatusSlotKeyMemoryQueue,
+
+ CmdStatusTotalSlotKeys
+} CmdStatusSlotKey;
+
+#define NumCmdStatusSlots MaxBackends
+
+#define PG_CMD_STATUS_INFO_MAGIC 0x79fb2449
+#define CMD_STATUS_BUFFER_SIZE 1024
+
+/*
+ * These structs are allocated on the program stack as local variables in the
+ * ExecutorRun hook. The top of stack is current_query_stack, see below.
+ *
+ * Capturing the execution stack allows us to inspect the inner-most running
+ * query as well as showing the complete backtrace.
+ */
+typedef struct CmdInfoStack {
+ QueryDesc *query_desc;
+ struct CmdInfoStack *parent;
+} CmdInfoStack;
+
+/* The array of slots pre-allocated on shared memory. */
+static volatile dsm_handle *CmdStatusSlots = NULL;
+
+/* Pointer into the slots array, to a slot assigned for this backend. */
+static volatile dsm_handle *MyCmdStatusSlot = NULL;
+
+static CmdInfoStack *current_query_stack = NULL;
+static int query_stack_size = 0; /* XXX not really used */
+
+static ExecutorRun_hook_type prev_ExecutorRun = NULL;
+
+static void cmdstatus_ExecutorRun(QueryDesc *queryDesc,
+ ScanDirection direction, long count);
+
+static void CleanupCmdStatusSlot(int status, Datum arg);
+static StringInfo ProcessCmdStatusSlot(CmdStatusInfoRequestType request_type,
+ CmdStatusInfoResultCode *result_code);
+
+static StringInfo explain_query(QueryDesc *queryDesc);
+static void report_result_code(CmdStatusInfoResultCode result, pid_t target_pid);
+static void RestoreResourceOwner(ResourceOwner prevResourceOwner);
+
+
+Size
+CmdStatusShmemSize(void)
+{
+ return NumCmdStatusSlots * sizeof(dsm_handle);
+}
+
+void
+CmdStatusShmemInit(void)
+{
+ Size size = CmdStatusShmemSize();
+ bool found;
+
+ CmdStatusSlots = (dsm_handle *) ShmemInitStruct("CmdStatusSlots", size, &found);
+
+ if (!found)
+ MemSet(CmdStatusSlots, 0, size);
+}
+
+static void
+CleanupCmdStatusSlot(int status, Datum arg)
+{
+ Assert(MyCmdStatusSlot != NULL);
+
+ *MyCmdStatusSlot = 0; /* just in case */
+
+ MyCmdStatusSlot = NULL;
+}
+
+/*
+ * CmdStatusInit
+ * Initialize MyCmdStatusSlot
+ *
+ * The passed index should be MyBackendId.
+ */
+void
+CmdStatusInit(int css_idx)
+{
+ MyCmdStatusSlot = &CmdStatusSlots[css_idx - 1];
+
+ on_shmem_exit(CleanupCmdStatusSlot, (Datum) 0);
+
+ /* also install executor hooks */
+ prev_ExecutorRun = ExecutorRun_hook;
+ ExecutorRun_hook = cmdstatus_ExecutorRun;
+}
+
+/*
+ * The executor hook.
+ *
+ * Accumulates the query descriptors on the program stack and takes care of
+ * popping the current frame when leaving the function ab-/normally.
+ */
+static void
+cmdstatus_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction, long count)
+{
+ CmdInfoStack current;
+
+ current.query_desc = queryDesc;
+ current.parent = current_query_stack;
+
+ current_query_stack = ¤t;
+ query_stack_size++;
+
+ PG_TRY();
+ {
+ if (prev_ExecutorRun)
+ prev_ExecutorRun(queryDesc, direction, count);
+ else
+ standard_ExecutorRun(queryDesc, direction, count);
+
+ Assert(current_query_stack == ¤t);
+ Assert(query_stack_size > 0);
+
+ query_stack_size--;
+ current_query_stack = current.parent;
+ }
+ PG_CATCH();
+ {
+ Assert(current_query_stack == ¤t);
+ Assert(query_stack_size > 0);
+
+ query_stack_size--;
+ current_query_stack = current.parent;
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+}
+
+/* Produce an explain plan of a single query descriptor. */
+static StringInfo
+explain_query(QueryDesc *queryDesc)
+{
+ StringInfo str;
+ ExplainState *es;
+
+ es = NewExplainState();
+ es->analyze = false;
+ es->verbose = false;
+ es->buffers = false;
+ es->format = EXPLAIN_FORMAT_TEXT;
+
+ ExplainBeginOutput(es);
+ /* XXX: appendStringInfo(es->str, "#%d ", depth); ? */
+ ExplainQueryText(es, queryDesc);
+ ExplainPrintPlan(es, queryDesc);
+ ExplainEndOutput(es);
+
+ str = es->str;
+
+ pfree(es);
+ return str;
+}
+
+
+/* signal handler for PROCSIG_CMD_STATUS_INFO */
+void
+HandleCmdStatusInfoInterrupt(void)
+{
+ CmdStatusInfoPending = true;
+ InterruptPending = true;
+
+ SetLatch(MyLatch);
+}
+
+/* Produce a response and set the result code for a single slot. */
+static StringInfo
+ProcessCmdStatusSlot(CmdStatusInfoRequestType request_type,
+ CmdStatusInfoResultCode *result_code)
+{
+ StringInfo result = NULL;
+
+ /* Show some optimism, overwrite with error code later if needed. */
+ *result_code = CMD_STATUS_RESULT_SUCCESS;
+
+ if (ActivePortal)
+ {
+ switch (request_type)
+ {
+ case CMD_STATUS_REQUEST_EXPLAIN:
+ if (ActivePortal->queryDesc != NULL)
+ result = explain_query(ActivePortal->queryDesc);
+ else if (current_query_stack != NULL)
+ result = explain_query(current_query_stack->query_desc);
+ else
+ *result_code = CMD_STATUS_RESULT_NO_DATA;
+ break;
+
+ case CMD_STATUS_REQUEST_EXPLAIN_BACKTRACE:
+ {
+ StringInfo str;
+ CmdInfoStack *query;
+
+ result = makeStringInfo();
+
+ if (current_query_stack != NULL)
+ {
+ for (query = current_query_stack;
+ query != NULL;
+ query = query->parent)
+ {
+ str = explain_query(query->query_desc);
+ appendBinaryStringInfo(result, str->data, str->len);
+
+ pfree(str->data);
+ pfree(str);
+ }
+ }
+ else
+ *result_code = CMD_STATUS_RESULT_BACKEND_IDLE;
+ break;
+ }
+
+ case CMD_STATUS_REQUEST_QUERY_TEXT:
+ result = makeStringInfo();
+ appendStringInfoString(result, ActivePortal->sourceText);
+ break;
+
+ case CMD_STATUS_REQUEST_PROGRESS_TAG:
+ if (ActivePortal->commandTag != NULL)
+ {
+ result = makeStringInfo();
+
+ if (ActivePortal->queryDesc != NULL &&
+ ActivePortal->queryDesc->estate != NULL)
+ {
+ appendStringInfo(result, "%s %u",
+ ActivePortal->commandTag,
+ ActivePortal->queryDesc->estate->es_processed);
+ }
+ else
+ {
+ /* no progress available, at least show the command tag */
+ appendStringInfoString(result, ActivePortal->commandTag);
+ }
+ }
+ else
+ *result_code = CMD_STATUS_RESULT_NO_DATA;
+ break;
+ }
+ }
+ else
+ *result_code = CMD_STATUS_RESULT_BACKEND_IDLE;
+
+ return result;
+}
+
+static void
+RestoreResourceOwner(ResourceOwner prevResourceOwner)
+{
+ if (prevResourceOwner != CurrentResourceOwner)
+ {
+ ResourceOwner res_owner = CurrentResourceOwner;
+
+ CurrentResourceOwner = prevResourceOwner;
+ ResourceOwnerDelete(res_owner);
+ }
+}
+
+/*
+ * Process all command status requests from other backends by scanning the
+ * whole array of slots looking for the requests targeted at us, which were
+ * not already processed by us during earlier rounds.
+ *
+ * Most of the times it is expected that the first valid slot we find is for
+ * us, so there's no penalty for actually attaching it and looking inside.
+ */
+void
+ProcessCmdStatusInfoRequest(void)
+{
+ ResourceOwner prevResourceOwner = CurrentResourceOwner;
+
+ /* Ensure valid resource owner for access to dsm. */
+ if (CurrentResourceOwner == NULL)
+ CurrentResourceOwner = ResourceOwnerCreate(NULL, "ProcessCmdStatusInfoRequest");
+
+ /*
+ * We might have been signaled again by another process while handling a
+ * request, so re-check after going through all the slots.
+ */
+ while (CmdStatusInfoPending)
+ {
+ int i;
+
+ CmdStatusInfoPending = false;
+
+ for (i = 0; i < NumCmdStatusSlots; i++)
+ {
+ dsm_handle slot = CmdStatusSlots[i];
+
+ if (slot != 0)
+ {
+ dsm_segment *seg = NULL;
+ shm_toc *toc = NULL;
+ pid_t *target_pid_ptr;
+ bool *is_processed_flag_ptr;
+ CmdStatusInfoRequestType *request_type_ptr;
+ CmdStatusInfoResultCode *result_code_ptr;
+ shm_mq *mq;
+ shm_mq_handle *output;
+
+ seg = dsm_attach(slot);
+ if (seg == NULL)
+ {
+ elog(LOG, "unable to map dynamic memory segment for command status");
+ continue;
+ }
+
+ toc = shm_toc_attach(PG_CMD_STATUS_INFO_MAGIC, dsm_segment_address(seg));
+ if (toc == NULL)
+ {
+ elog(LOG, "bad magic in dynamic memory segment for command status");
+
+ dsm_detach(seg);
+ continue;
+ }
+
+ target_pid_ptr = (pid_t *)
+ shm_toc_lookup(toc, CmdStatusSlotKeyTargetProcessID);
+ is_processed_flag_ptr = (bool *)
+ shm_toc_lookup(toc, CmdStatusSlotKeyIsProcessedFlag);
+
+ if (*target_pid_ptr != MyProcPid || *is_processed_flag_ptr)
+ {
+ dsm_detach(seg);
+ continue;
+ }
+
+ /* Set the flag early to avoid re-processing during next round. */
+ *is_processed_flag_ptr = true;
+
+ request_type_ptr = (CmdStatusInfoRequestType *)
+ shm_toc_lookup(toc, CmdStatusSlotKeyRequestType);
+
+ result_code_ptr = (CmdStatusInfoResultCode *)
+ shm_toc_lookup(toc, CmdStatusSlotKeyResultCode);
+
+ mq = shm_toc_lookup(toc, CmdStatusSlotKeyMemoryQueue);
+ output = shm_mq_attach(mq, seg, NULL);
+
+ PG_TRY();
+ {
+ StringInfo payload;
+ int res;
+
+ payload = ProcessCmdStatusSlot(*request_type_ptr, result_code_ptr);
+ if (payload != NULL)
+ {
+ /*
+ * Send the data, including null terminator.
+ *
+ * This blocks until the receiving side completes
+ * reading or detaches from the queue.
+ */
+ res = shm_mq_send(output, payload->len + 1, payload->data, false);
+ if (res != SHM_MQ_SUCCESS)
+ elog(LOG, "cannot send command status via shared memory queue");
+
+ pfree(payload->data);
+ pfree(payload);
+ }
+ /*
+ * If there was no payload to send, we just detach from
+ * the queue and the receiver should wake up.
+ */
+
+ shm_mq_detach(mq);
+ dsm_detach(seg);
+ }
+ PG_CATCH();
+ {
+ /*
+ * Detaching from the queue also wakes up the receiver, be
+ * sure to update the slot status before detaching.
+ */
+ shm_mq_detach(mq);
+ dsm_detach(seg);
+
+ RestoreResourceOwner(prevResourceOwner); /* XXX remove if not re-throwing */
+
+ PG_RE_THROW(); /* XXX is it better to log/suppress the error here? */
+ }
+ PG_END_TRY();
+ }
+ }
+ }
+
+ RestoreResourceOwner(prevResourceOwner);
+}
+
+static void
+report_result_code(CmdStatusInfoResultCode result, pid_t target_pid)
+{
+ switch (result)
+ {
+ case CMD_STATUS_RESULT_BACKEND_IDLE:
+ elog(INFO, "no command is currently running in backend with PID %d",
+ target_pid);
+ break;
+
+ case CMD_STATUS_RESULT_NO_DATA:
+ elog(WARNING, "no suitable data found for the query in backend with PID %d",
+ target_pid);
+ break;
+
+ default:
+ elog(ERROR, "general command status request failure when querying backend with PID %d",
+ target_pid);
+ break;
+ }
+}
+
+/*
+ * Try to get status of a command running in another process.
+ *
+ * FUNCTION pg_cmdstatus(pid, request_type)
+ * RETURNS SETOF text
+ */
+Datum
+pg_cmdstatus(PG_FUNCTION_ARGS)
+{
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ Tuplestorestate *tupstore;
+ TupleDesc tupdesc;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ pid_t target_pid = (pid_t) PG_GETARG_INT32(0);
+ int request_type = PG_GETARG_INT32(1);
+ PGPROC *target_proc;
+ shm_toc_estimator estimator;
+ Size segsize;
+ dsm_segment *seg;
+ shm_toc *toc;
+ pid_t *target_pid_ptr;
+ bool *is_processed_flag_ptr;
+ CmdStatusInfoRequestType *request_type_ptr;
+ CmdStatusInfoResultCode *result_code_ptr;
+ shm_mq *mq;
+ shm_mq_handle *input;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+
+ if (request_type < 1 || request_type > CMD_STATUS_MAX_REQUEST)
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("unknown command status request")));
+
+ if (target_pid == MyProcPid)
+ ereport(ERROR,
+ (errmsg("backend cannot query command status of itself")));
+
+ /* verify access to target_pid */
+ target_proc = BackendPidGetProc(target_pid);
+
+ if (target_proc == NULL)
+ ereport(ERROR,
+ (errmsg("PID %d is not a PostgreSQL server process", target_pid)));
+
+ if (!(superuser() || target_proc->roleId == GetUserId()))
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ (errmsg("must be superuser or have the same role to explain queries running in other server processes"))));
+
+ if (*MyCmdStatusSlot != 0)
+ ereport(ERROR,
+ (errmsg("pg_cmdstatus() call is already in progress")));
+
+ /* need to build tuplestore in query context */
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+ tupstore = tuplestore_begin_heap(false, false, work_mem);
+ MemoryContextSwitchTo(oldcontext);
+
+ /* prepare shared dsm segment */
+ shm_toc_initialize_estimator(&estimator);
+ shm_toc_estimate_keys(&estimator, CmdStatusTotalSlotKeys);
+ shm_toc_estimate_chunk(&estimator, sizeof(pid_t));
+ shm_toc_estimate_chunk(&estimator, sizeof(bool));
+ shm_toc_estimate_chunk(&estimator, sizeof(CmdStatusInfoRequestType));
+ shm_toc_estimate_chunk(&estimator, sizeof(CmdStatusInfoResultCode));
+ shm_toc_estimate_chunk(&estimator, CMD_STATUS_BUFFER_SIZE);
+ segsize = shm_toc_estimate(&estimator);
+
+ seg = dsm_create(segsize, 0);
+
+ toc = shm_toc_create(PG_CMD_STATUS_INFO_MAGIC, dsm_segment_address(seg), segsize);
+
+ target_pid_ptr = (pid_t *) shm_toc_allocate(toc, sizeof(pid_t));
+ *target_pid_ptr = target_pid;
+ shm_toc_insert(toc, CmdStatusSlotKeyTargetProcessID, target_pid_ptr);
+
+ is_processed_flag_ptr = (bool *) shm_toc_allocate(toc, sizeof(bool));
+ *is_processed_flag_ptr = false;
+ shm_toc_insert(toc, CmdStatusSlotKeyIsProcessedFlag, is_processed_flag_ptr);
+
+ request_type_ptr = (CmdStatusInfoRequestType *)
+ shm_toc_allocate(toc, sizeof(CmdStatusInfoRequestType));
+ *request_type_ptr = request_type;
+ shm_toc_insert(toc, CmdStatusSlotKeyRequestType, request_type_ptr);
+
+ result_code_ptr = (CmdStatusInfoResultCode *)
+ shm_toc_allocate(toc, sizeof(CmdStatusInfoResultCode));
+ *result_code_ptr = CMD_STATUS_RESULT_FAILURE;
+ shm_toc_insert(toc, CmdStatusSlotKeyResultCode, result_code_ptr);
+
+ mq = shm_mq_create(shm_toc_allocate(toc, CMD_STATUS_BUFFER_SIZE),
+ CMD_STATUS_BUFFER_SIZE);
+ shm_toc_insert(toc, CmdStatusSlotKeyMemoryQueue, mq);
+
+ shm_mq_set_receiver(mq, MyProc);
+ shm_mq_set_sender(mq, target_proc);
+
+ PG_TRY();
+ {
+ shm_mq_result res;
+ char *data = NULL;
+ Size length;
+
+ input = shm_mq_attach(mq, seg, NULL);
+
+ *MyCmdStatusSlot = dsm_segment_handle(seg);
+
+ if (SendProcSignal(target_pid, PROCSIG_CMD_STATUS_INFO, target_proc->backendId))
+ elog(ERROR, "could not signal backend with PID %d", target_pid);
+
+ /*
+ * This blocks until the sender process writes a message to the queue
+ * or detaches from it. In either case we wake up and process the
+ * result.
+ */
+ res = shm_mq_receive(input, &length, (void **) &data, false);
+ if (res == SHM_MQ_SUCCESS)
+ {
+ const char *p = data;
+
+ /* break into tuples on newline */
+ while (*p)
+ {
+ const char *q = strchr(p, '\n');
+ Size len = q ? q - p : strlen(p);
+ Datum value;
+ HeapTuple tuple;
+ bool isnull = false;
+
+ value = PointerGetDatum(cstring_to_text_with_len(p, len));
+
+ tuple = heap_form_tuple(tupdesc, &value, &isnull);
+ tuplestore_puttuple(tupstore, tuple);
+
+ if (!q)
+ break;
+
+ p += len + 1;
+ }
+ }
+ else
+ {
+ report_result_code(*result_code_ptr, target_pid);
+ }
+
+ /* clean up and return the tuplestore */
+ tuplestore_donestoring(tupstore);
+
+ *MyCmdStatusSlot = 0;
+
+ shm_mq_detach(mq);
+ dsm_detach(seg);
+ }
+ PG_CATCH();
+ {
+ *MyCmdStatusSlot = 0;
+
+ shm_mq_detach(mq);
+ dsm_detach(seg);
+
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ return (Datum) 0;
+}
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 23e594e..2269853 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -29,6 +29,7 @@ ProtocolVersion FrontendProtocol;
volatile bool InterruptPending = false;
volatile bool QueryCancelPending = false;
volatile bool ProcDiePending = false;
+volatile bool CmdStatusInfoPending = false;
volatile bool ClientConnectionLost = false;
volatile uint32 InterruptHoldoffCount = 0;
volatile uint32 QueryCancelHoldoffCount = 0;
diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c
index 7b19714..7e7ba77 100644
--- a/src/backend/utils/init/postinit.c
+++ b/src/backend/utils/init/postinit.c
@@ -50,6 +50,7 @@
#include "storage/smgr.h"
#include "tcop/tcopprot.h"
#include "utils/acl.h"
+#include "utils/cmdstatus.h"
#include "utils/fmgroids.h"
#include "utils/guc.h"
#include "utils/memutils.h"
@@ -588,6 +589,9 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username,
/* Now that we have a BackendId, we can participate in ProcSignal */
ProcSignalInit(MyBackendId);
+ /* Init CmdStatus slot */
+ CmdStatusInit(MyBackendId);
+
/*
* Also set up timeout handlers needed for backend operation. We need
* these in every case except bootstrap.
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index ddf7c67..d083aaf 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -3130,6 +3130,8 @@ DESCR("get OID of current session's temp schema, if any");
DATA(insert OID = 2855 ( pg_is_other_temp_schema PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ pg_is_other_temp_schema _null_ _null_ _null_ ));
DESCR("is schema another session's temp schema?");
+DATA(insert OID = 4099 ( pg_cmdstatus PGNSP PGUID 12 1 100 0 0 f f f f f t s 2 0 25 "23 23" _null_ _null_ _null_ _null_ _null_ pg_cmdstatus _null_ _null_ _null_ ));
+DESCR("returns information about another process");
DATA(insert OID = 2171 ( pg_cancel_backend PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_cancel_backend _null_ _null_ _null_ ));
DESCR("cancel a server process' current query");
DATA(insert OID = 2096 ( pg_terminate_backend PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ ));
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index e0cc69f..5f03f63 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -80,6 +80,7 @@
extern PGDLLIMPORT volatile bool InterruptPending;
extern PGDLLIMPORT volatile bool QueryCancelPending;
extern PGDLLIMPORT volatile bool ProcDiePending;
+extern PGDLLIMPORT volatile bool CmdStatusInfoPending;
extern volatile bool ClientConnectionLost;
diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h
index af1a0cd..cd856e5 100644
--- a/src/include/storage/procsignal.h
+++ b/src/include/storage/procsignal.h
@@ -41,6 +41,8 @@ typedef enum
PROCSIG_RECOVERY_CONFLICT_BUFFERPIN,
PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK,
+ PROCSIG_CMD_STATUS_INFO,
+
NUM_PROCSIGNALS /* Must be last! */
} ProcSignalReason;
diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h
index fc1679e..605612e 100644
--- a/src/include/utils/builtins.h
+++ b/src/include/utils/builtins.h
@@ -1243,6 +1243,9 @@ extern Datum pg_identify_object_as_address(PG_FUNCTION_ARGS);
/* catalog/objectaddress.c */
extern Datum pg_get_object_address(PG_FUNCTION_ARGS);
+/* utils/adt/cmdstatus.c */
+extern Datum pg_cmdstatus(PG_FUNCTION_ARGS);
+
/* commands/constraint.c */
extern Datum unique_key_recheck(PG_FUNCTION_ARGS);
diff --git a/src/include/utils/cmdstatus.h b/src/include/utils/cmdstatus.h
new file mode 100644
index 0000000..85627a57
--- /dev/null
+++ b/src/include/utils/cmdstatus.h
@@ -0,0 +1,23 @@
+/*-------------------------------------------------------------------------
+ *
+ * cmdstatus.h
+ * Declarations for command status interrupt handling.
+ *
+ *
+ * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group
+ *
+ * src/include/utils/cmdstatus.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CMDSTATUS_H
+#define CMDSTATUS_H
+
+extern Size CmdStatusShmemSize(void);
+extern void CmdStatusShmemInit(void);
+extern void CmdStatusInit(int css_idx);
+
+extern void HandleCmdStatusInfoInterrupt(void);
+extern void ProcessCmdStatusInfoRequest(void);
+
+#endif /* CMDSTATUS_H */
--
Sent via pgsql-hackers mailing list ([email protected])
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers