On Mon, Aug 31, 2015 at 12:35 PM, Pavel Stehule <pavel.steh...@gmail.com> wrote:
> >>> >>> http://www.postgresql.org/message-id/cafj8praxcs9b8abgim-zauvggqdhpzoarz5ysp1_nhv9hp8...@mail.gmail.com >>> >> >> Ah, thanks! Somehow I've missed this mail. You didn't add the patch to >> a commitfest back then I think? >> > > I had no time to finish this patch - there is few issues in signal > handling and returning back result - but still I want it :) - and what I > know - almost all other SQL db has similar functionality. > I've updated the patch for the current master and also added some unexpected parameters handling, so attached is a v2. I'd say we should hide the so-designed pg_cmdstatus() interface behind more friendly calls like pg_explain_backend() and pg_backend_progress() to give some naming examples, to remove the need for magic numbers in the second arg. What I've found missing in this approach is the insight into nested executor runs, so that if you're running a "SELECT my_func()", you only see this outer query in the pg_cmdstatus() output. With the auto_explain approach, by hooking into executor I was able to capture the nested queries and their plans as well. It's conceptually trivial to add some code to use the Executor hooks here, but I don't see any precedent for this except for contrib modules (auto_explain and pg_stat_statements), I'm just not sure if that would be OK-ish. And when we solve that, there is another problem of having a sane interface to query the nested plans. For a psql user, probably the most interesting would be the topmost (level=1) and the innermost (e.g. level=-1) plans. We might also want to provide a full nesting of plans in a structured format like JSON or... *cough* XML, for programs to consume and display nicely with folding and stuff. And the most interesting would be making instrumentation work with all of the above. I'm adding this to the next CF. -- Alex
diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index 0abde43..40db40d 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -26,6 +26,7 @@ #include "storage/shmem.h" #include "storage/sinval.h" #include "tcop/tcopprot.h" +#include "utils/cmdstatus.h" /* @@ -296,6 +297,9 @@ procsignal_sigusr1_handler(SIGNAL_ARGS) if (CheckProcSignal(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN)) RecoveryConflictInterrupt(PROCSIG_RECOVERY_CONFLICT_BUFFERPIN); + if (CheckProcSignal(PROCSIG_CMDSTATUS_INFO)) + HandleCmdStatusInfoInterrupt(); + if (set_latch_on_sigusr1) SetLatch(MyLatch); diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index ce4bdaf..5d5df58 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -67,6 +67,7 @@ #include "tcop/pquery.h" #include "tcop/tcopprot.h" #include "tcop/utility.h" +#include "utils/cmdstatus.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/ps_status.h" @@ -2991,6 +2992,9 @@ ProcessInterrupts(void) if (ParallelMessagePending) HandleParallelMessages(); + + if (CmdStatusInfoRequested) + ProcessCmdStatusInfoRequest(); } diff --git a/src/backend/utils/adt/Makefile b/src/backend/utils/adt/Makefile index 3ed0b44..2c8687c 100644 --- a/src/backend/utils/adt/Makefile +++ b/src/backend/utils/adt/Makefile @@ -18,7 +18,7 @@ endif # keep this list arranged alphabetically or it gets to be a mess OBJS = acl.o arrayfuncs.o array_expanded.o array_selfuncs.o \ array_typanalyze.o array_userfuncs.o arrayutils.o ascii.o \ - bool.o cash.o char.o date.o datetime.o datum.o dbsize.o domains.o \ + bool.o cash.o char.o cmdstatus.o date.o datetime.o datum.o dbsize.o domains.o \ encode.o enum.o expandeddatum.o \ float.o format_type.o formatting.o genfile.o \ geo_ops.o geo_selfuncs.o inet_cidr_ntop.o inet_net_pton.o int.o \ diff --git a/src/backend/utils/adt/cmdstatus.c b/src/backend/utils/adt/cmdstatus.c new file mode 100644 index 0000000..38c1947 --- /dev/null +++ b/src/backend/utils/adt/cmdstatus.c @@ -0,0 +1,508 @@ +/*------------------------------------------------------------------------- + * + * cmdstatus.c + * Definitions for pg_cmdstatus function. + * + * Copyright (c) 1996-2015, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/utils/adt/cmdstatus.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "funcapi.h" +#include "miscadmin.h" + +#include "access/htup_details.h" +#include "commands/explain.h" +#include "lib/stringinfo.h" +#include "storage/latch.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "tcop/dest.h" +#include "tcop/pquery.h" +#include "utils/builtins.h" +#include "utils/cmdstatus.h" + + +#define CMDINFO_SLOTS 100 +#define BUFFER_SIZE (8 * 1024) + +bool CmdStatusInfoRequested = false; + +typedef struct { + bool is_valid; + bool is_done; + int target_pid; + int sender_pid; + int request_type; + int result_code; +} CmdStatusInfoEntry; + +typedef struct { + LWLock *lock; /* protect slots - search/modification */ + CmdStatusInfoEntry *slots; + LWLock *buffer_lock; /* protect buffer handling */ + void *buffer; /* result data */ + Size buffer_size; + int target_pid; + int sender_pid; + bool buffer_is_free; /* buffer is generally available */ + bool buffer_holds_data; /* buffer holds a valid data */ +} CmdStatusInfo; + +static CmdStatusInfo *cmd_status_info = NULL; + +/* + * Prepare explain of query + * + */ +static StringInfo +explain_query(QueryDesc *queryDesc) +{ + ExplainState *es; + + es = NewExplainState(); + es->analyze = false; + es->verbose = false; + es->buffers = false; + es->format = EXPLAIN_FORMAT_TEXT; + + ExplainBeginOutput(es); + ExplainQueryText(es, queryDesc); + ExplainPrintPlan(es, queryDesc); + ExplainEndOutput(es); + + /* Remove last line break */ + if (es->str->len > 0 && es->str->data[es->str->len - 1] == '\n') + es->str->data[--es->str->len] = '\0'; + + return es->str; +} + +static CmdStatusInfo * +attach_shmem(CmdStatusInfo *cmd_status_info) +{ + bool found; + + if (cmd_status_info == NULL) + { + LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); + + cmd_status_info = (CmdStatusInfo *) ShmemInitStruct("cmdstatusinfo", + sizeof(CmdStatusInfo), + &found); + if (!found) + { + int i; + + cmd_status_info->lock = LWLockAssign(); + cmd_status_info->slots = ShmemAlloc(CMDINFO_SLOTS * sizeof(CmdStatusInfo)); + + for (i = 0; i < CMDINFO_SLOTS; i++) + { + cmd_status_info->slots[i].is_valid = false; + } + + cmd_status_info->buffer_lock = LWLockAssign(); + cmd_status_info->buffer = ShmemAlloc(BUFFER_SIZE); + cmd_status_info->buffer_is_free = true; + cmd_status_info->buffer_holds_data = true; + } + + LWLockRelease(AddinShmemInitLock); + } + + return cmd_status_info; +} + +/* + * write data to shm buffer - wait for free buffer + * + */ +static void +write_to_shm_buffer(int target_pid, int sender_pid, void *data, Size bytes) +{ + int loop = 0; + + cmd_status_info = attach_shmem(cmd_status_info); + + while (1) + { + LWLockAcquire(cmd_status_info->buffer_lock, LW_EXCLUSIVE); + + if (cmd_status_info->buffer_is_free) + { + cmd_status_info->target_pid = target_pid; + cmd_status_info->sender_pid = sender_pid; + cmd_status_info->buffer_is_free = false; + cmd_status_info->buffer_holds_data = true; + + cmd_status_info->buffer_size = bytes; + memcpy(cmd_status_info->buffer, data, bytes); + + LWLockRelease(cmd_status_info->buffer_lock); + break; + } + else + { + LWLockRelease(cmd_status_info->buffer_lock); + + if (loop++ % 100 == 0); + CHECK_FOR_INTERRUPTS(); + + if (loop > 100000) + elog(ERROR, "cannot to take buffer to send data"); + + pg_usleep(1000L); + } + } +} + +/* + * It read data from shm buffer, waits for data + * + */ +static void * +read_from_shm_buffer(int target_pid, int sender_pid, Size *bytes) +{ + void *result = NULL; + int loop = 0; + + cmd_status_info = attach_shmem(cmd_status_info); + + while (1) + { + LWLockAcquire(cmd_status_info->buffer_lock, LW_EXCLUSIVE); + + if (cmd_status_info->buffer_holds_data && + cmd_status_info->target_pid == target_pid && + cmd_status_info->sender_pid == sender_pid) + { + result = palloc(cmd_status_info->buffer_size); + memcpy(result, cmd_status_info->buffer, cmd_status_info->buffer_size); + *bytes = cmd_status_info->buffer_size; + + cmd_status_info->buffer_is_free = true; + cmd_status_info->buffer_holds_data = false; + + LWLockRelease(cmd_status_info->buffer_lock); + + break; + } + else + { + LWLockRelease(cmd_status_info->buffer_lock); + + if (loop++ % 100 == 0); + CHECK_FOR_INTERRUPTS(); + + pg_usleep(1000L); + } + } + + return result; +} + +/* signal handler for PROCSIG_CMDSTATUS_INFO */ +void +HandleCmdStatusInfoInterrupt(void) +{ + InterruptPending = true; + CmdStatusInfoRequested = true; + + SetLatch(MyLatch); +} + +void +ProcessCmdStatusInfoRequest(void) +{ + bool found = true; + + cmd_status_info = attach_shmem(cmd_status_info); + + /* search any request for current process */ + while (found) + { + int i; + CmdStatusInfoEntry *csie; + + found = false; + csie = NULL; + + /* take lock for slots */ + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + /* try to find any request in valid slots */ + for (i = 0; i < CMDINFO_SLOTS; i++) + { + csie = &(cmd_status_info->slots[i]); + + if (csie->is_valid && !csie->is_done && csie->target_pid == MyProcPid) + { + found = true; + break; + } + } + + LWLockRelease(cmd_status_info->lock); + + if (found) + { + /* process request */ + Assert(csie != NULL); + + csie->is_done = true; + csie->result_code = -1; + + if (ActivePortal) + { + if (csie->request_type == 1) + { + if (ActivePortal->queryDesc != NULL) + { + StringInfo str; + + str = explain_query(ActivePortal->queryDesc); + write_to_shm_buffer(csie->sender_pid, MyProcPid, (void *) str->data, str->len); + + pfree(str->data); + + csie->result_code = 0; + } + else + { +#define NO_QUERY_DESC_MSG "(the running query description could not be found)" + write_to_shm_buffer(csie->sender_pid, MyProcPid, + NO_QUERY_DESC_MSG, + sizeof(NO_QUERY_DESC_MSG) - 1); +#undef NO_QUERY_DESC_MSG + } + } + else if (csie->request_type == 2) + { + write_to_shm_buffer(csie->sender_pid, MyProcPid, + (void *) ActivePortal->sourceText, + strlen(ActivePortal->sourceText)); + csie->result_code = 0; + } + else if (csie->request_type == 3) + { + if (ActivePortal->commandTag != NULL) + { + if (ActivePortal->queryDesc != NULL && + ActivePortal->queryDesc->estate != NULL) + { + char completionTag[COMPLETION_TAG_BUFSIZE]; + + snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "%s %u", + ActivePortal->commandTag, + ActivePortal->queryDesc->estate->es_processed); + + write_to_shm_buffer(csie->sender_pid, MyProcPid, + (void *) completionTag, + strlen(completionTag)); + } + else + { + write_to_shm_buffer(csie->sender_pid, MyProcPid, + (void *) ActivePortal->commandTag, + strlen(ActivePortal->commandTag)); + } + + csie->result_code = 0; + } + else + { +#define NO_COMMAND_TAG_MSG "(a command tag is not set for the running query)" + write_to_shm_buffer(csie->sender_pid, MyProcPid, + NO_COMMAND_TAG_MSG, + sizeof(NO_COMMAND_TAG_MSG) - 1); +#undef NO_COMMAND_TAG_MSG + } + } + else + { +#define UNKNOWN_REQUEST_TYPE_MSG "(unknown request_type)" + write_to_shm_buffer(csie->sender_pid, MyProcPid, + UNKNOWN_REQUEST_TYPE_MSG, + sizeof(UNKNOWN_REQUEST_TYPE_MSG) - 1); +#undef UNKNOWN_REQUEST_TYPE_MSG + } + } + else + { +#define NO_QUERY_RUNNING_MSG "(no query is currently running)" + write_to_shm_buffer(csie->sender_pid, MyProcPid, + NO_QUERY_RUNNING_MSG, + sizeof(NO_QUERY_RUNNING_MSG) - 1); +#undef NO_QUERY_RUNNING_MSG + + } + } + } +} + + +static CmdStatusInfoEntry * +NewCmdStatusInfoEntry(int target_pid, int request_type) +{ + bool found = false; + CmdStatusInfoEntry *csie = NULL; + int i; + + cmd_status_info = attach_shmem(cmd_status_info); + + /* find unused slot */ + LWLockAcquire(cmd_status_info->lock, LW_EXCLUSIVE); + + for (i = 0; i < CMDINFO_SLOTS; i++) + { + csie = &(cmd_status_info->slots[i]); + + if (!csie->is_valid) + { + found = true; + break; + } + } + + LWLockRelease(cmd_status_info->lock); + + if (!found) + elog(ERROR, "there are not free slots for cmdstatusinfo now"); + + csie->is_valid = true; + csie->is_done = false; + csie->target_pid = target_pid; + csie->sender_pid = MyProcPid; + csie->request_type = request_type; + csie->result_code = 0; + + return csie; +} + +/* + * try to get status of command in another process + * + * FUNCTION get_cmdstatus(pid, request_type) + * RETURNS SETOF text + */ +Datum +pg_cmdstatus(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + Tuplestorestate *tupstore; + TupleDesc tupdesc; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + int target_pid = PG_GETARG_INT32(0); + int request_type = PG_GETARG_INT32(1); + PGPROC *proc; + CmdStatusInfoEntry *csie; + + Size len; + void *data = NULL; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + + /* need to build tuplestore in query context */ + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc); + tupstore = tuplestore_begin_heap(false, false, work_mem); + MemoryContextSwitchTo(oldcontext); + + csie = NewCmdStatusInfoEntry(target_pid, request_type); + + PG_TRY(); + { + /* verify access to target_pid */ + proc = BackendPidGetProc(target_pid); + + if (proc == NULL) + ereport(ERROR, + (errmsg("PID %d is not a PostgreSQL server process", target_pid))); + + if (!(superuser() || proc->roleId == GetUserId())) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + (errmsg("must be superuser or have the same role to cancel queries running in other server processes")))); + + if (SendProcSignal((pid_t) target_pid, PROCSIG_CMDSTATUS_INFO, InvalidBackendId) < 0) + elog(ERROR, "could not signal backend with PID %d", target_pid); + + while (1) + { + data = read_from_shm_buffer(MyProcPid, target_pid, &len); + if (len > 0) + { + Datum value; + HeapTuple tuple; + bool isnull = false; + Size processed = 0; + char *cursor = data; + + /* parse to rows */ + while (processed < len) + { + char *eol = strchr(cursor, '\n'); + + if (eol != NULL) + { + int line_size = eol - cursor; + + value = PointerGetDatum(cstring_to_text_with_len(cursor, line_size)); + cursor += line_size + 1; + processed += line_size + 1; + } + else + { + /* last line */ + value = PointerGetDatum(cstring_to_text_with_len(cursor, len - processed)); + processed = len; + } + + tuple = heap_form_tuple(tupdesc, &value, &isnull); + tuplestore_puttuple(tupstore, tuple); + } + + pfree(data); + } + + if (csie->is_done) + break; + } + + csie->is_valid = false; + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); + } + PG_CATCH(); + { + csie->is_valid = false; + PG_RE_THROW(); + } + PG_END_TRY(); + + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + return (Datum) 0; +} diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index ddf7c67..d083aaf 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -3130,6 +3130,8 @@ DESCR("get OID of current session's temp schema, if any"); DATA(insert OID = 2855 ( pg_is_other_temp_schema PGNSP PGUID 12 1 0 0 0 f f f f t f s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ pg_is_other_temp_schema _null_ _null_ _null_ )); DESCR("is schema another session's temp schema?"); +DATA(insert OID = 4099 ( pg_cmdstatus PGNSP PGUID 12 1 100 0 0 f f f f f t s 2 0 25 "23 23" _null_ _null_ _null_ _null_ _null_ pg_cmdstatus _null_ _null_ _null_ )); +DESCR("returns information about another process"); DATA(insert OID = 2171 ( pg_cancel_backend PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_cancel_backend _null_ _null_ _null_ )); DESCR("cancel a server process' current query"); DATA(insert OID = 2096 ( pg_terminate_backend PGNSP PGUID 12 1 0 0 0 f f f f t f v 1 0 16 "23" _null_ _null_ _null_ _null_ _null_ pg_terminate_backend _null_ _null_ _null_ )); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index af1a0cd..ab1698c 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -41,6 +41,9 @@ typedef enum PROCSIG_RECOVERY_CONFLICT_BUFFERPIN, PROCSIG_RECOVERY_CONFLICT_STARTUP_DEADLOCK, + /* cmd status info */ + PROCSIG_CMDSTATUS_INFO, + NUM_PROCSIGNALS /* Must be last! */ } ProcSignalReason; diff --git a/src/include/utils/builtins.h b/src/include/utils/builtins.h index fc1679e..605612e 100644 --- a/src/include/utils/builtins.h +++ b/src/include/utils/builtins.h @@ -1243,6 +1243,9 @@ extern Datum pg_identify_object_as_address(PG_FUNCTION_ARGS); /* catalog/objectaddress.c */ extern Datum pg_get_object_address(PG_FUNCTION_ARGS); +/* utils/adt/cmdstatus.c */ +extern Datum pg_cmdstatus(PG_FUNCTION_ARGS); + /* commands/constraint.c */ extern Datum unique_key_recheck(PG_FUNCTION_ARGS); diff --git a/src/include/utils/cmdstatus.h b/src/include/utils/cmdstatus.h new file mode 100644 index 0000000..ff88ead --- /dev/null +++ b/src/include/utils/cmdstatus.h @@ -0,0 +1,21 @@ +/*------------------------------------------------------------------------- + * + * cmdstatus.h + * Declarations for command status interrupt handling. + * + * + * Portions Copyright (c) 1996-2015, PostgreSQL Global Development Group + * + * src/include/utils/cmdstatus.h + * + *------------------------------------------------------------------------- + */ +#ifndef CMDSTATUS_H +#define CMDSTATUS_H + +extern bool CmdStatusInfoRequested; + +extern void HandleCmdStatusInfoInterrupt(void); +extern void ProcessCmdStatusInfoRequest(void); + +#endif /* CMDSTATUS_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers