> For additional entertainment, I include patches that integrate > background sessions into dblink. So dblink can open a connection to a > background session, and then you can use the existing dblink functions > to send queries, read results, etc. People use dblink to make > self-connections to get autonomous subsessions, so this would directly > address that use case. The 0001 patch is some prerequisite refactoring > to remove an ugly macro mess, which is useful independent of this. 0002 > is the actual patch.
Updated patch, mainly with improved error handling and some tidying up. Related to this is also the patch in <https://www.postgresql.org/message-id/d100f62a-0606-accc-693b-cdc6d16b9...@2ndquadrant.com> as a resource control mechanism. -- Peter Eisentraut http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
>From 48058db2817b8955ea3424bc77b55bb5782ffb0d Mon Sep 17 00:00:00 2001 From: Peter Eisentraut <pete...@gmx.net> Date: Wed, 28 Dec 2016 12:00:00 -0500 Subject: [PATCH v3] Add background sessions This adds a C API to run SQL statements in a background worker, communicating by FE/BE protocol over a DSM message queue. This can be used to execute statements and transactions separate from the main foreground session. Also included is a PL/Python interface to this functionality. --- doc/src/sgml/bgsession.sgml | 273 +++++++ doc/src/sgml/filelist.sgml | 1 + doc/src/sgml/plpython.sgml | 105 ++- doc/src/sgml/postgres.sgml | 1 + doc/src/sgml/stylesheet-common.xsl | 1 + src/backend/commands/variable.c | 5 + src/backend/libpq/pqmq.c | 26 +- src/backend/storage/ipc/shm_mq.c | 19 + src/backend/tcop/Makefile | 2 +- src/backend/tcop/bgsession.c | 929 ++++++++++++++++++++++++ src/backend/tcop/postgres.c | 38 +- src/include/commands/variable.h | 1 + src/include/libpq/pqmq.h | 1 + src/include/storage/shm_mq.h | 3 + src/include/tcop/bgsession.h | 32 + src/include/tcop/tcopprot.h | 10 + src/pl/plpython/Makefile | 2 + src/pl/plpython/expected/plpython_bgsession.out | 217 ++++++ src/pl/plpython/expected/plpython_test.out | 7 +- src/pl/plpython/plpy_bgsession.c | 509 +++++++++++++ src/pl/plpython/plpy_bgsession.h | 18 + src/pl/plpython/plpy_main.h | 3 + src/pl/plpython/plpy_planobject.c | 1 + src/pl/plpython/plpy_planobject.h | 2 + src/pl/plpython/plpy_plpymodule.c | 5 + src/pl/plpython/plpy_spi.c | 29 +- src/pl/plpython/plpy_spi.h | 3 + src/pl/plpython/sql/plpython_bgsession.sql | 177 +++++ 28 files changed, 2388 insertions(+), 32 deletions(-) create mode 100644 doc/src/sgml/bgsession.sgml create mode 100644 src/backend/tcop/bgsession.c create mode 100644 src/include/tcop/bgsession.h create mode 100644 src/pl/plpython/expected/plpython_bgsession.out create mode 100644 src/pl/plpython/plpy_bgsession.c create mode 100644 src/pl/plpython/plpy_bgsession.h create mode 100644 src/pl/plpython/sql/plpython_bgsession.sql diff --git a/doc/src/sgml/bgsession.sgml b/doc/src/sgml/bgsession.sgml new file mode 100644 index 0000000000..14efb1b495 --- /dev/null +++ b/doc/src/sgml/bgsession.sgml @@ -0,0 +1,273 @@ +<!-- doc/src/sgml/bgsession.sgml --> + +<chapter id="bgsession"> + <title>Background Session API</title> + + <para> + The background session API is a C API for creating additional database + sessions in the background and running SQL statements in them. A background + session behaves like a normal (foreground) session in that it has session + state, transactions, can run SQL statements, and so on. Unlike a foreground + session, it is not connected directly to a client. Instead the foreground + session can use this API to execute SQL statements and retrieve their + results. Higher-level integrations, such as in procedural languages, can + make this functionality available to clients. Background sessions are + independent from their foreground sessions in their session and transaction + state. So a background session cannot see uncommitted data in foreground + sessions or vice versa, and there is no preferential treatment about + locking. Like all sessions, background sessions are separate processes. + Foreground and background sessions communicate over shared memory messages + queues instead of the sockets that a client/server connection uses. + </para> + + <para> + Background sessions can be useful in a variety of scenarios when effects + that are independent of the foreground session are to be achieved, for + example: + <itemizedlist> + <listitem> + <para> + Commit data independent of whether a foreground transaction commits, for + example for auditing. A trigger in the foreground session could effect + the necessary writes via a background session. + </para> + </listitem> + <listitem> + <para> + Large changes can be split up into smaller transactions. A foreground + session can orchestrate the logic, for example in a function, while the + actual writes and commits are executed in a background session. + </para> + </listitem> + </itemizedlist> + </para> + + <sect1 id="bgsession-types"> + <title>API</title> + + <para> + Use <literal>#include "tcop/bgsession.h"</literal> to include the API + declarations. + </para> + + <sect2> + <title>Types</title> + + <variablelist> + <varlistentry> + <term><type>BackgroundSession</type></term> + <listitem> + <para> + An opaque connection handle. Multiple background sessions can exist at + the same time, and each is prepresented by an instance of this type. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><type>BackgroundSessionPreparedStatement</type></term> + <listitem> + <para> + An opaque handle for a prepared statement, created when the statement + is prepared and used when the statement is executed. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><type>BackgroundSessionResult</type></term> + <listitem> + <para> + A handle for a query result, defined as: +<programlisting> +typedef struct BackgroundSessionResult +{ + TupleDesc tupdesc; + HeapTuple *tuples; + uint64 ntuples; + const char *command; +} BackgroundSessionResult; +</programlisting> + <structfield>tupdesc</structfield> describes the result + rows, <symbol>NULL</symbol> if the command does not return + rows. <structfield>tuples</structfield> is an array of + size <structfield>ntuples</structfield> with the result + rows. <structfield>command</structfield> is the tag of the executed + command. + </para> + </listitem> + </varlistentry> + </variablelist> + </sect2> + + <sect2> + <title>Functions</title> + + <variablelist> + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>BackgroundSession *<function>BackgroundSessionStart</function></funcdef> + <paramdef>void</paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Creates a background session. This starts the background worker and + establishes a connection to it. + </para> + + <para> + A background session does not automatically end when the foreground + transaction or session ends. + Use <function>BackgroundSessionEnd()</function> to end a background + session. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>void <function>BackgroundSessionEnd</function></funcdef> + <paramdef>BackgroundSession *<parameter>session</parameter></paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Ends a background session. This closes the connection the background + worker. + </para> + + <para> + It is an error to close a background session with a transaction block + open. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecute</function></funcdef> + <paramdef>BackgroundSession *<parameter>session</parameter></paramdef> + <paramdef>const char *<parameter>sql</parameter></paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Execute an SQL statement and return the result. Access the fields of + the result structure to get details about the command result. If there + is an error, this function does not return. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>void <function>BackgroundSessionSend</function></funcdef> + <paramdef>BackgroundSession *<parameter>session</parameter></paramdef> + <paramdef>const char *<parameter>sql</parameter></paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Execute an SQL statement, but don't wait for the result. The result + can then be fetched later + with <function>BackgroundSessionGetResult()</function>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>BackgroundSessionResult *<function>BackgroundSessionSend</function></funcdef> + <paramdef>BackgroundSession *<parameter>session</parameter></paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Obtain the result of an SQL statement previously sent using + with <function>BackgroundSessionSend()</function>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>BackgroundSessionPreparedStatement *<function>BackgroundSessionPrepare</function></funcdef> + <paramdef>BackgroundSession *<parameter>session</parameter></paramdef> + <paramdef>const char *<parameter>sql</parameter></paramdef> + <paramdef>int <parameter>nargs</parameter></paramdef> + <paramdef>Oid <parameter>argtypes</parameter>[]</paramdef> + <paramdef>const char *<parameter>argnames</parameter></paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Prepare an SQL statement for later execution + by <function>BackgroundSessionExecutePrepared()</function>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecutePrepared</function></funcdef> + <paramdef>BackgroundSessionPreparedStatement *<parameter>stmt</parameter></paramdef> + <paramdef>int <parameter>nargs</parameter></paramdef> + <paramdef>Datum <parameter>values</parameter>[]</paramdef> + <paramdef>bool <parameter>nulls</parameter>[]</paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Execute a statement previously prepared by + <function>BackgroundSessionPrepare()</function>. + </para> + </listitem> + </varlistentry> + </variablelist> + + <para> + Here is a very simple example: + +<programlisting> +#include "tcop/bgsession.h" + +void +myfunc() +{ + BackgroundSession *session; + BackgroundSessionResult *result; + + session = BackgroundSessionStart(); + + result = BackgroundSessionExecute(session, "SELECT ..."); + elog(INFO, "returned %d rows", list_length(result->tuples)); + + BackgroundSessionEnd(session); +} +</programlisting> + </para> + </sect2> + </sect1> +</chapter> diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index e7aa92f914..cdf628e913 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -53,6 +53,7 @@ <!ENTITY logical-replication SYSTEM "logical-replication.sgml"> <!-- programmer's guide --> +<!ENTITY bgsession SYSTEM "bgsession.sgml"> <!ENTITY bgworker SYSTEM "bgworker.sgml"> <!ENTITY dfunc SYSTEM "dfunc.sgml"> <!ENTITY ecpg SYSTEM "ecpg.sgml"> diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml index 46397781be..b36f246c0c 100644 --- a/doc/src/sgml/plpython.sgml +++ b/doc/src/sgml/plpython.sgml @@ -966,7 +966,8 @@ <title>Database Access Functions</title> <term><literal><function>status</function>()</literal></term> <listitem> <para> - The <function>SPI_execute()</function> return value. + The <function>SPI_execute()</function> return value, or 0 if not + applicable. </para> </listitem> </varlistentry> @@ -1357,6 +1358,108 @@ <title>Older Python Versions</title> </sect2> </sect1> + <sect1 id="plpython-bgsession"> + <title>Background Sessions</title> + + <para> + PL/Python exposes the background session interface described + in <xref linkend="bgsession"> as a Python API. A background session is + represented by a Python object of type <type>plpy.BackgroundSession</type>. + It has these methods: + + <variablelist> + <varlistentry> + <term><literal>.close()</literal></term> + <listitem> + <para> + Close the session. This must be called or the session will leak. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.execute(<parameter>sql</parameter>)</literal></term> + <listitem> + <para> + Execute an SQL statement. The return value is a result object of the + same type as returned by <literal>plpy.execute</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.prepare(<parameter>sql</parameter>, <replaceable>types</replaceable>)</literal></term> + <listitem> + <para> + Prepare an SQL statement. The parameter data types are specified as + for <literal>plpy.prepare</literal>. The return value is a plan + object. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.execute_prepared(<parameter>plan</parameter>, <replaceable>values</replaceable>)</literal></term> + <listitem> + <para> + Execute a prepared statement. The parameter values are specified as + for <literal>plpy.execute</literal>. The return value is a result + object. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.__enter__()</literal></term> + <listitem> + <para> + This just returns self. It is supplied so that + a <classname>BackgroundSession</classname> object can be used as a + context manager. See example below. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.__exit__(<parameter>exc_type</parameter>, <parameter>exc</parameter>, <parameter>tb</parameter>)</literal></term> + <listitem> + <para> + This is the same as <literal>.close()</literal>. It is supplied so + that a <classname>BackgroundSession</classname> object can be used as a + context manager. (The arguments are ignored.) + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + + <para> + A <classname>BackgroundSession</classname> object can be preserved across + function calls or passed between functions via the <varname>SD</varname> + and <varname>GD</varname> variables. + </para> + + <para> + Example: +<programlisting> +CREATE FUNCTION bulkload() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + for i in range(1, 1001): + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 100 == 0: + a.execute("COMMIT") + +return 0 +$$; +</programlisting> + This function inserts 1000 values into the table and commits after every + 100 values. + </para> + </sect1> + <sect1 id="plpython-util"> <title>Utility Functions</title> <para> diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml index 4e169d1b18..50f9c3964f 100644 --- a/doc/src/sgml/postgres.sgml +++ b/doc/src/sgml/postgres.sgml @@ -221,6 +221,7 @@ <title>Server Programming</title> &spi; &bgworker; + &bgsession; &logicaldecoding; &replication-origins; diff --git a/doc/src/sgml/stylesheet-common.xsl b/doc/src/sgml/stylesheet-common.xsl index c23d38f128..e620f87ef0 100644 --- a/doc/src/sgml/stylesheet-common.xsl +++ b/doc/src/sgml/stylesheet-common.xsl @@ -28,6 +28,7 @@ </xsl:param> <xsl:param name="callout.graphics" select="'0'"></xsl:param> +<xsl:param name="funcsynopsis.style">ansi</xsl:param> <xsl:param name="toc.section.depth">2</xsl:param> <xsl:param name="linenumbering.extension" select="'0'"></xsl:param> <xsl:param name="section.autolabel" select="1"></xsl:param> diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index d75bddd87b..8be22a1b0f 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -671,12 +671,17 @@ show_random_seed(void) * SET CLIENT_ENCODING */ +void (*check_client_encoding_hook)(void); + bool check_client_encoding(char **newval, void **extra, GucSource source) { int encoding; const char *canonical_name; + if (check_client_encoding_hook) + check_client_encoding_hook(); + /* Look up the encoding by name */ encoding = pg_valid_client_encoding(*newval); if (encoding < 0) diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 96939327c3..412d894b2b 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -48,6 +48,11 @@ static PQcommMethods PqCommMqMethods = { mq_endcopyout }; +static PQcommMethods *save_PqCommMethods; +static CommandDest save_whereToSendOutput; +static ProtocolVersion save_FrontendProtocol; +static dsm_segment *save_seg; + /* * Arrange to redirect frontend/backend protocol messages to a shared-memory * message queue. @@ -55,12 +60,30 @@ static PQcommMethods PqCommMqMethods = { void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) { + save_PqCommMethods = PqCommMethods; + save_whereToSendOutput = whereToSendOutput; + save_FrontendProtocol = FrontendProtocol; + PqCommMethods = &PqCommMqMethods; pq_mq = shm_mq_get_queue(mqh); pq_mq_handle = mqh; whereToSendOutput = DestRemote; FrontendProtocol = PG_PROTOCOL_LATEST; on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0); + + save_seg = seg; +} + +void +pq_stop_redirect_to_shm_mq(void) +{ + cancel_on_dsm_detach(save_seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0); + PqCommMethods = save_PqCommMethods; + whereToSendOutput = save_whereToSendOutput; + FrontendProtocol = save_FrontendProtocol; + pq_mq = NULL; + pq_mq_handle = NULL; + save_seg = NULL; } /* @@ -182,7 +205,8 @@ mq_putmessage(char msgtype, const char *s, size_t len) Assert(result == SHM_MQ_SUCCESS || result == SHM_MQ_DETACHED); if (result != SHM_MQ_SUCCESS) - return EOF; + ereport(COMMERROR, + (errmsg("could not send on message queue: %s", shm_mq_strerror(result)))); return 0; } diff --git a/src/backend/storage/ipc/shm_mq.c b/src/backend/storage/ipc/shm_mq.c index f5bf807cd6..99ad9854df 100644 --- a/src/backend/storage/ipc/shm_mq.c +++ b/src/backend/storage/ipc/shm_mq.c @@ -805,6 +805,25 @@ shm_mq_get_queue(shm_mq_handle *mqh) } /* + * Get error message string. + */ +const char * +shm_mq_strerror(shm_mq_result res) +{ + switch (res) + { + case SHM_MQ_SUCCESS: + return gettext_noop("Success"); + case SHM_MQ_WOULD_BLOCK: + return gettext_noop("Operation would block"); + case SHM_MQ_DETACHED: + return gettext_noop("Other process has detached queue"); + default: + return gettext_noop("Unknown error"); + } +} + +/* * Write bytes into a shared message queue. */ static shm_mq_result diff --git a/src/backend/tcop/Makefile b/src/backend/tcop/Makefile index 674302feb7..ab267673a3 100644 --- a/src/backend/tcop/Makefile +++ b/src/backend/tcop/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/tcop top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS= dest.o fastpath.o postgres.o pquery.o utility.o +OBJS= bgsession.o dest.o fastpath.o postgres.o pquery.o utility.o ifneq (,$(filter $(PORTNAME),cygwin win32)) override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT) diff --git a/src/backend/tcop/bgsession.c b/src/backend/tcop/bgsession.c new file mode 100644 index 0000000000..7578e93e9c --- /dev/null +++ b/src/backend/tcop/bgsession.c @@ -0,0 +1,929 @@ +/*-------------------------------------------------------------------------- + * + * bgsession.c + * Run SQL commands using a background worker. + * + * Copyright (C) 2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/tcop/bgsession.c + * + * + * This implements a C API to open a background session and run SQL queries + * in it. The session looks much like a normal database connection, but it is + * always to the same database, and there is no authentication needed. The + * "backend" for that connection is a background worker. The normal backend + * and the background session worker communicate over the normal FE/BE + * protocol. + * + * Types: + * + * BackgroundSession -- opaque connection handle + * BackgroundSessionPreparedStatement -- opaque prepared statement handle + * BackgroundSessionResult -- query result + * + * Functions: + * + * BackgroundSessionStart() -- start a session (launches background worker) + * and return a handle + * + * BackgroundSessionEnd() -- close session and free resources + * + * BackgroundSessionExecute() -- run SQL string and return result (rows or + * status) + * + * BackgroundSessionSend() -- run SQL string without waiting for result + * + * BackgroundSessionGetResult() -- get result from prior ...Send() + * + * BackgroundSessionPrepare() -- prepare an SQL string for subsequent + * execution + * + * BackgroundSessionExecutePrepared() -- run prepared statement + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/tupdesc.h" +#include "access/xact.h" +#include "commands/async.h" +#include "commands/variable.h" +#include "lib/stringinfo.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "storage/dsm.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "tcop/bgsession.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +/* Table-of-contents constants for our dynamic shared memory segment. */ +#define BGSESSION_MAGIC 0x50674267 + +#define BGSESSION_KEY_FIXED_DATA 0 +#define BGSESSION_KEY_GUC 1 +#define BGSESSION_KEY_COMMAND_QUEUE 2 +#define BGSESSION_KEY_RESPONSE_QUEUE 3 +#define BGSESSION_NKEYS 4 + +#define BGSESSION_QUEUE_SIZE 16384 + +/* Fixed-size data passed via our dynamic shared memory segment. */ +typedef struct background_session_fixed_data +{ + Oid database_id; + Oid authenticated_user_id; + Oid current_user_id; + int sec_context; +} background_session_fixed_data; + +struct BackgroundSession +{ + ResourceOwner resowner; + dsm_segment *seg; + BackgroundWorkerHandle *worker_handle; + shm_mq_handle *command_qh; + shm_mq_handle *response_qh; + int transaction_status; +}; + +struct BackgroundSessionPreparedStatement +{ + BackgroundSession *session; + Oid *argtypes; + TupleDesc tupdesc; +}; + +static void bgsession_worker_main(Datum main_arg); +static void shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg); +static void bgsession_check_client_encoding_hook(void); +static TupleDesc TupleDesc_from_RowDescription(StringInfo msg); +static HeapTuple HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg); +static void forward_NotifyResponse(StringInfo msg); +static void rethrow_errornotice(StringInfo msg); +static void invalid_protocol_message(char msgtype) pg_attribute_noreturn(); + + +BackgroundSession * +BackgroundSessionStart(void) +{ + ResourceOwner oldowner; + BackgroundWorker worker; + pid_t pid; + BackgroundSession *session; + shm_toc_estimator e; + Size segsize; + Size guc_len; + char *gucstate; + dsm_segment *seg; + shm_toc *toc; + background_session_fixed_data *fdata; + shm_mq *command_mq; + shm_mq *response_mq; + BgwHandleStatus bgwstatus; + StringInfoData msg; + char msgtype; + + session = palloc(sizeof(*session)); + + session->resowner = ResourceOwnerCreate(NULL, "background session"); + + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(background_session_fixed_data)); + shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE); + shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE); + guc_len = EstimateGUCStateSpace(); + shm_toc_estimate_chunk(&e, guc_len); + shm_toc_estimate_keys(&e, BGSESSION_NKEYS); + segsize = shm_toc_estimate(&e); + oldowner = CurrentResourceOwner; + PG_TRY(); + { + CurrentResourceOwner = session->resowner; + seg = dsm_create(segsize, 0); + } + PG_CATCH(); + { + CurrentResourceOwner = oldowner; + PG_RE_THROW(); + } + PG_END_TRY(); + CurrentResourceOwner = oldowner; + + session->seg = seg; + + toc = shm_toc_create(BGSESSION_MAGIC, dsm_segment_address(seg), segsize); + + /* Store fixed-size data in dynamic shared memory. */ + fdata = shm_toc_allocate(toc, sizeof(*fdata)); + fdata->database_id = MyDatabaseId; + fdata->authenticated_user_id = GetAuthenticatedUserId(); + GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context); + shm_toc_insert(toc, BGSESSION_KEY_FIXED_DATA, fdata); + + /* Store GUC state in dynamic shared memory. */ + gucstate = shm_toc_allocate(toc, guc_len); + SerializeGUCState(guc_len, gucstate); + shm_toc_insert(toc, BGSESSION_KEY_GUC, gucstate); + + command_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE), + BGSESSION_QUEUE_SIZE); + shm_toc_insert(toc, BGSESSION_KEY_COMMAND_QUEUE, command_mq); + shm_mq_set_sender(command_mq, MyProc); + + response_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE), + BGSESSION_QUEUE_SIZE); + shm_toc_insert(toc, BGSESSION_KEY_RESPONSE_QUEUE, response_mq); + shm_mq_set_receiver(response_mq, MyProc); + + session->command_qh = shm_mq_attach(command_mq, seg, NULL); + session->response_qh = shm_mq_attach(response_mq, seg, NULL); + + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = bgsession_worker_main; + snprintf(worker.bgw_name, BGW_MAXLEN, "background session by PID %d", MyProcPid); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &session->worker_handle)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You might need to increase max_worker_processes."))); + + shm_mq_set_handle(session->command_qh, session->worker_handle); + shm_mq_set_handle(session->response_qh, session->worker_handle); + + bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid); + if (bgwstatus != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start background worker"))); + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'E': + case 'N': + rethrow_errornotice(&msg); + break; + case 'Z': + session->transaction_status = pq_getmsgbyte(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'Z'); + + return session; +} + + +void +BackgroundSessionEnd(BackgroundSession *session) +{ + StringInfoData msg; + + if (session->transaction_status == 'T') + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("background session ended with transaction block open"))); + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'X'); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + pfree(session->worker_handle); + dsm_detach(session->seg); + ResourceOwnerRelease(session->resowner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false); + ResourceOwnerDelete(session->resowner); + pfree(session); +} + + +void +BackgroundSessionSend(BackgroundSession *session, const char *sql) +{ + StringInfoData msg; + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'Q'); + pq_sendstring(&msg, sql); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); +} + + +static void +bgresult_store_tuple(BackgroundSessionResult *result, HeapTuple tuple) +{ + AssertArg(result); + Assert(result->ntuples <= result->alloced); + + if (result->ntuples >= result->alloced) + { + uint64 newsize = (result->alloced > 0) ? result->alloced * 2 : 128; + + if (result->tuples == NULL) + result->tuples = palloc(newsize * sizeof(*result->tuples)); + else + result->tuples = repalloc(result->tuples, newsize * sizeof(*result->tuples)); + result->alloced = newsize; + } + + result->tuples[result->ntuples++] = tuple; +} + + +BackgroundSessionResult * +BackgroundSessionGetResult(BackgroundSession *session) +{ + StringInfoData msg; + char msgtype; + BackgroundSessionResult *result; + + result = palloc0(sizeof(*result)); + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'C': + { + const char *tag = pq_getmsgstring(&msg); + result->command = pstrdup(tag); + pq_getmsgend(&msg); + break; + } + case 'D': + if (!result->tupdesc) + elog(ERROR, "no T before D"); + bgresult_store_tuple(result, HeapTuple_from_DataRow(result->tupdesc, &msg)); + pq_getmsgend(&msg); + break; + case 'E': + case 'N': + rethrow_errornotice(&msg); + break; + case 'T': + if (result->tupdesc) + elog(ERROR, "already received a T message"); + result->tupdesc = TupleDesc_from_RowDescription(&msg); + pq_getmsgend(&msg); + break; + case 'Z': + session->transaction_status = pq_getmsgbyte(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'Z'); + + return result; +} + + +BackgroundSessionResult * +BackgroundSessionExecute(BackgroundSession *session, const char *sql) +{ + BackgroundSessionSend(session, sql); + return BackgroundSessionGetResult(session); +} + + +BackgroundSessionPreparedStatement * +BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs, + Oid argtypes[], const char *argnames[]) +{ + BackgroundSessionPreparedStatement *result; + StringInfoData msg; + int i; + char msgtype; + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'P'); + pq_sendstring(&msg, ""); + pq_sendstring(&msg, sql); + pq_sendint(&msg, nargs, 2); + for (i = 0; i < nargs; i++) + pq_sendint(&msg, argtypes[i], 4); + if (argnames) + for (i = 0; i < nargs; i++) + pq_sendstring(&msg, argnames[i]); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + result = palloc0(sizeof(*result)); + result->session = session; + result->argtypes = palloc(nargs * sizeof(*result->argtypes)); + memcpy(result->argtypes, argtypes, nargs * sizeof(*result->argtypes)); + + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case '1': + break; + case 'E': + rethrow_errornotice(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'D'); + pq_sendbyte(&msg, 'S'); + pq_sendstring(&msg, ""); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'E': + rethrow_errornotice(&msg); + break; + case 'n': + break; + case 't': + /* ignore for now */ + break; + case 'T': + if (result->tupdesc) + elog(ERROR, "already received a T message"); + result->tupdesc = TupleDesc_from_RowDescription(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'n' && msgtype != 'T'); + + return result; +} + + +BackgroundSessionResult * +BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[]) +{ + BackgroundSession *session; + StringInfoData msg; + BackgroundSessionResult *result; + char msgtype; + int i; + + session = stmt->session; + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'B'); + pq_sendstring(&msg, ""); + pq_sendstring(&msg, ""); + pq_sendint(&msg, 1, 2); /* number of parameter format codes */ + pq_sendint(&msg, 1, 2); + pq_sendint(&msg, nargs, 2); /* number of parameter values */ + for (i = 0; i < nargs; i++) + { + if (nulls[i]) + pq_sendint(&msg, -1, 4); + else + { + Oid typsend; + bool typisvarlena; + bytea *outputbytes; + + getTypeBinaryOutputInfo(stmt->argtypes[i], &typsend, &typisvarlena); + outputbytes = OidSendFunctionCall(typsend, values[i]); + pq_sendint(&msg, VARSIZE(outputbytes) - VARHDRSZ, 4); + pq_sendbytes(&msg, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ); + pfree(outputbytes); + } + } + pq_sendint(&msg, 1, 2); /* number of result column format codes */ + pq_sendint(&msg, 1, 2); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case '2': + break; + case 'E': + rethrow_errornotice(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'E'); + pq_sendstring(&msg, ""); + pq_sendint(&msg, 0, 4); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + result = palloc0(sizeof(*result)); + result->tupdesc = stmt->tupdesc; + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'C': + { + const char *tag = pq_getmsgstring(&msg); + result->command = pstrdup(tag); + pq_getmsgend(&msg); + break; + } + case 'D': + if (!stmt->tupdesc) + elog(ERROR, "did not expect any rows"); + bgresult_store_tuple(result, HeapTuple_from_DataRow(stmt->tupdesc, &msg)); + pq_getmsgend(&msg); + break; + case 'E': + case 'N': + rethrow_errornotice(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'C'); + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_putemptymessage('S'); + pq_stop_redirect_to_shm_mq(); + + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'Z': + session->transaction_status = pq_getmsgbyte(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + + return result; +} + + +static void +bgsession_worker_main(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + background_session_fixed_data *fdata; + char *gucstate; + shm_mq *command_mq; + shm_mq *response_mq; + shm_mq_handle *command_qh; + shm_mq_handle *response_qh; + StringInfoData msg; + char msgtype; + + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "background session worker"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "background session", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory segment"))); + + toc = shm_toc_attach(BGSESSION_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Find data structures in dynamic shared memory. */ + fdata = shm_toc_lookup(toc, BGSESSION_KEY_FIXED_DATA); + + gucstate = shm_toc_lookup(toc, BGSESSION_KEY_GUC); + + command_mq = shm_toc_lookup(toc, BGSESSION_KEY_COMMAND_QUEUE); + shm_mq_set_receiver(command_mq, MyProc); + command_qh = shm_mq_attach(command_mq, seg, NULL); + + response_mq = shm_toc_lookup(toc, BGSESSION_KEY_RESPONSE_QUEUE); + shm_mq_set_sender(response_mq, MyProc); + response_qh = shm_mq_attach(response_mq, seg, NULL); + + pq_redirect_to_shm_mq(seg, response_qh); + BackgroundWorkerInitializeConnectionByOid(fdata->database_id, + fdata->authenticated_user_id); + + SetClientEncoding(GetDatabaseEncoding()); + + StartTransactionCommand(); + RestoreGUCState(gucstate); + CommitTransactionCommand(); + + process_session_preload_libraries(); + + SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context); + + whereToSendOutput = DestRemote; + ReadyForQuery(whereToSendOutput); + + MessageContext = AllocSetContextCreate(TopMemoryContext, + "MessageContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + do + { + PG_TRY(); + { + MemoryContextSwitchTo(MessageContext); + MemoryContextResetAndDeleteChildren(MessageContext); + + ProcessCompletedNotifies(); + pgstat_report_stat(false); + pgstat_report_activity(STATE_IDLE, NULL); + + shm_mq_receive_stringinfo(command_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'B': + { + SetCurrentStatementStartTimestamp(); + exec_bind_message(&msg); + break; + } + case 'D': + { + int describe_type; + const char *describe_target; + + SetCurrentStatementStartTimestamp(); + + describe_type = pq_getmsgbyte(&msg); + describe_target = pq_getmsgstring(&msg); + pq_getmsgend(&msg); + + switch (describe_type) + { + case 'S': + exec_describe_statement_message(describe_target); + break; +#ifdef TODO + case 'P': + exec_describe_portal_message(describe_target); + break; +#endif + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid DESCRIBE message subtype %d", + describe_type))); + break; + } + } + break; + case 'E': + { + const char *portal_name; + int max_rows; + + SetCurrentStatementStartTimestamp(); + + portal_name = pq_getmsgstring(&msg); + max_rows = pq_getmsgint(&msg, 4); + pq_getmsgend(&msg); + + exec_execute_message(portal_name, max_rows); + } + break; + + case 'P': + { + const char *stmt_name; + const char *query_string; + int numParams; + Oid *paramTypes = NULL; + + SetCurrentStatementStartTimestamp(); + + stmt_name = pq_getmsgstring(&msg); + query_string = pq_getmsgstring(&msg); + numParams = pq_getmsgint(&msg, 2); + if (numParams > 0) + { + int i; + + paramTypes = palloc(numParams * sizeof(Oid)); + for (i = 0; i < numParams; i++) + paramTypes[i] = pq_getmsgint(&msg, 4); + } + pq_getmsgend(&msg); + + exec_parse_message(query_string, stmt_name, + paramTypes, numParams); + break; + } + case 'Q': + { + const char *sql; + + sql = pq_getmsgstring(&msg); + pq_getmsgend(&msg); + + check_client_encoding_hook = bgsession_check_client_encoding_hook; + + SetCurrentStatementStartTimestamp(); + exec_simple_query(sql, 1); + + check_client_encoding_hook = NULL; + + ReadyForQuery(whereToSendOutput); + break; + } + case 'S': + { + pq_getmsgend(&msg); + finish_xact_command(); + ReadyForQuery(whereToSendOutput); + break; + } + case 'X': + break; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid protocol message type from background session leader: %c", + msgtype))); + break; + } + } + PG_CATCH(); + { + EmitErrorReport(); + AbortCurrentTransaction(); + MemoryContextSwitchTo(TopMemoryContext); + FlushErrorState(); + reset_xact_command(); + } + PG_END_TRY(); + } + while (msgtype != 'X'); +} + + +static void +shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg) +{ + shm_mq_result res; + Size nbytes; + void *data; + + res = shm_mq_receive(qh, &nbytes, &data, false); + if (res != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not read from message queue: %s", shm_mq_strerror(res)))); + + initStringInfo(msg); + appendBinaryStringInfo(msg, data, nbytes); +} + + +static void +bgsession_check_client_encoding_hook(void) +{ + elog(ERROR, "cannot set client encoding in background session"); +} + + +static TupleDesc +TupleDesc_from_RowDescription(StringInfo msg) +{ + TupleDesc tupdesc; + int natts = pq_getmsgint(msg, 2); + int i; + + tupdesc = CreateTemplateTupleDesc(natts, false); + for (i = 0; i < natts; i++) + { + const char *colname; + Oid type_oid; + int32 typmod; + int16 format; + + colname = pq_getmsgstring(msg); + (void) pq_getmsgint(msg, 4); /* table OID */ + (void) pq_getmsgint(msg, 2); /* table attnum */ + type_oid = pq_getmsgint(msg, 4); + (void) pq_getmsgint(msg, 2); /* type length */ + typmod = pq_getmsgint(msg, 4); + format = pq_getmsgint(msg, 2); + (void) format; +#ifdef TODO + /* XXX The protocol sometimes sends 0 (text) if the format is not + * determined yet. We always use binary, so this check is probably + * not useful. */ + if (format != 1) + elog(ERROR, "format must be binary"); +#endif + + TupleDescInitEntry(tupdesc, i + 1, colname, type_oid, typmod, 0); + } + return tupdesc; +} + + +static HeapTuple +HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg) +{ + int natts = pq_getmsgint(msg, 2); + int i; + Datum *values; + bool *nulls; + StringInfoData buf; + + Assert(tupdesc); + + if (natts != tupdesc->natts) + elog(ERROR, "malformed DataRow"); + + values = palloc(natts * sizeof(*values)); + nulls = palloc(natts * sizeof(*nulls)); + initStringInfo(&buf); + + for (i = 0; i < natts; i++) + { + int32 len = pq_getmsgint(msg, 4); + + if (len < 0) + nulls[i] = true; + else + { + Oid recvid; + Oid typioparams; + + nulls[i] = false; + + getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid, + &recvid, + &typioparams); + resetStringInfo(&buf); + appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, len), len); + values[i] = OidReceiveFunctionCall(recvid, &buf, typioparams, + tupdesc->attrs[i]->atttypmod); + } + } + + return heap_form_tuple(tupdesc, values, nulls); +} + + +static void +forward_NotifyResponse(StringInfo msg) +{ + int32 pid; + const char *channel; + const char *payload; + + pid = pq_getmsgint(msg, 4); + channel = pq_getmsgrawstring(msg); + payload = pq_getmsgrawstring(msg); + pq_getmsgend(msg); + + NotifyMyFrontEnd(channel, payload, pid); +} + + +static void +rethrow_errornotice(StringInfo msg) +{ + ErrorData edata; + + pq_parse_errornotice(msg, &edata); + edata.elevel = Min(edata.elevel, ERROR); + ThrowErrorData(&edata); +} + + +static void +invalid_protocol_message(char msgtype) +{ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid protocol message type from background session: %c", + msgtype))); +} diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index b07d6c6cb9..2e5b601a5e 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -180,8 +180,6 @@ static int errdetail_execute(List *raw_parsetree_list); static int errdetail_params(ParamListInfo params); static int errdetail_abort(void); static int errdetail_recovery_conflict(void); -static void start_xact_command(void); -static void finish_xact_command(void); static bool IsTransactionExitStmt(Node *parsetree); static bool IsTransactionExitStmtList(List *pstmts); static bool IsTransactionStmtList(List *pstmts); @@ -876,8 +874,8 @@ pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams) * * Execute a "simple Query" protocol message. */ -static void -exec_simple_query(const char *query_string) +void +exec_simple_query(const char *query_string, int16 format) { CommandDest dest = whereToSendOutput; MemoryContext oldcontext; @@ -970,7 +968,6 @@ exec_simple_query(const char *query_string) *plantree_list; Portal portal; DestReceiver *receiver; - int16 format; /* * Get the command name for use in status display (it also becomes the @@ -1061,6 +1058,8 @@ exec_simple_query(const char *query_string) */ PortalStart(portal, NULL, 0, InvalidSnapshot); + if (format < 0) + { /* * Select the appropriate output format: text unless we are doing a * FETCH from a binary cursor. (Pretty grotty to have to do this here @@ -1081,6 +1080,7 @@ exec_simple_query(const char *query_string) format = 1; /* BINARY */ } } + } PortalSetResultFormat(portal, 1, &format); /* @@ -1192,7 +1192,7 @@ exec_simple_query(const char *query_string) * * Execute a "Parse" protocol message. */ -static void +void exec_parse_message(const char *query_string, /* string to execute */ const char *stmt_name, /* name for prepared stmt */ Oid *paramTypes, /* parameter types */ @@ -1454,7 +1454,7 @@ exec_parse_message(const char *query_string, /* string to execute */ * * Process a "Bind" message to create a portal from a prepared statement */ -static void +void exec_bind_message(StringInfo input_message) { const char *portal_name; @@ -1837,7 +1837,7 @@ exec_bind_message(StringInfo input_message) * * Process an "Execute" message for a portal */ -static void +void exec_execute_message(const char *portal_name, long max_rows) { CommandDest dest; @@ -2064,6 +2064,9 @@ check_log_statement(List *stmt_list) { ListCell *stmt_item; + if (IsBackgroundWorker) + return false; + if (log_statement == LOGSTMT_NONE) return false; if (log_statement == LOGSTMT_ALL) @@ -2099,6 +2102,9 @@ check_log_statement(List *stmt_list) int check_log_duration(char *msec_str, bool was_logged) { + if (IsBackgroundWorker) + return false; + if (log_duration || log_min_duration_statement >= 0) { long secs; @@ -2286,7 +2292,7 @@ errdetail_recovery_conflict(void) * * Process a "Describe" message for a prepared statement */ -static void +void exec_describe_statement_message(const char *stmt_name) { CachedPlanSource *psrc; @@ -2430,7 +2436,7 @@ exec_describe_portal_message(const char *portal_name) /* * Convenience routines for starting/committing a single command. */ -static void +void start_xact_command(void) { if (!xact_started) @@ -2448,7 +2454,7 @@ start_xact_command(void) } } -static void +void finish_xact_command(void) { if (xact_started) @@ -2473,6 +2479,12 @@ finish_xact_command(void) } } +void +reset_xact_command(void) +{ + xact_started = false; +} + /* * Convenience routines for checking whether a statement is one of the @@ -3891,7 +3903,7 @@ PostgresMain(int argc, char *argv[], ignore_till_sync = true; /* We don't have a transaction command open anymore */ - xact_started = false; + reset_xact_command(); /* * If an error occurred while we were reading a message from the @@ -4063,7 +4075,7 @@ PostgresMain(int argc, char *argv[], if (am_walsender) exec_replication_command(query_string); else - exec_simple_query(query_string); + exec_simple_query(query_string, -1); send_ready_for_query = true; } diff --git a/src/include/commands/variable.h b/src/include/commands/variable.h index 247423c6fb..2a41d6e59c 100644 --- a/src/include/commands/variable.h +++ b/src/include/commands/variable.h @@ -29,6 +29,7 @@ extern bool check_transaction_deferrable(bool *newval, void **extra, GucSource s extern bool check_random_seed(double *newval, void **extra, GucSource source); extern void assign_random_seed(double newval, void *extra); extern const char *show_random_seed(void); +extern void (*check_client_encoding_hook)(void); extern bool check_client_encoding(char **newval, void **extra, GucSource source); extern void assign_client_encoding(const char *newval, void *extra); extern bool check_session_authorization(char **newval, void **extra, GucSource source); diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h index e356bd60f4..b5bc7abb5a 100644 --- a/src/include/libpq/pqmq.h +++ b/src/include/libpq/pqmq.h @@ -17,6 +17,7 @@ #include "storage/shm_mq.h" extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh); +extern void pq_stop_redirect_to_shm_mq(void); extern void pq_set_parallel_master(pid_t pid, BackendId backend_id); extern void pq_parse_errornotice(StringInfo str, ErrorData *edata); diff --git a/src/include/storage/shm_mq.h b/src/include/storage/shm_mq.h index 7a37535ab3..9c1d54ef5d 100644 --- a/src/include/storage/shm_mq.h +++ b/src/include/storage/shm_mq.h @@ -79,6 +79,9 @@ extern shm_mq_result shm_mq_receive(shm_mq_handle *mqh, /* Wait for our counterparty to attach to the queue. */ extern shm_mq_result shm_mq_wait_for_attach(shm_mq_handle *mqh); +/* Get error message string. */ +extern const char *shm_mq_strerror(shm_mq_result res); + /* Smallest possible queue. */ extern PGDLLIMPORT const Size shm_mq_minimum_size; diff --git a/src/include/tcop/bgsession.h b/src/include/tcop/bgsession.h new file mode 100644 index 0000000000..744681ef79 --- /dev/null +++ b/src/include/tcop/bgsession.h @@ -0,0 +1,32 @@ +#ifndef BGSESSION_H +#define BGSESSION_H + +#include "access/htup.h" +#include "access/tupdesc.h" + +struct BackgroundSession; +typedef struct BackgroundSession BackgroundSession; + +struct BackgroundSessionPreparedStatement; +typedef struct BackgroundSessionPreparedStatement BackgroundSessionPreparedStatement; + +typedef struct BackgroundSessionResult +{ + TupleDesc tupdesc; + HeapTuple *tuples; + uint64 ntuples; + uint64 alloced; + const char *command; +} BackgroundSessionResult; + +BackgroundSession *BackgroundSessionStart(void); +void BackgroundSessionEnd(BackgroundSession *session); + +void BackgroundSessionSend(BackgroundSession *session, const char *sql); +BackgroundSessionResult *BackgroundSessionGetResult(BackgroundSession *session); +BackgroundSessionResult *BackgroundSessionExecute(BackgroundSession *session, const char *sql); + +BackgroundSessionPreparedStatement *BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs, Oid argtypes[], const char *argnames[]); +BackgroundSessionResult *BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[]); + +#endif /* BGSESSION_H */ diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 1958be85b7..cea599274e 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -58,6 +58,12 @@ extern PlannedStmt *pg_plan_query(Query *querytree, int cursorOptions, ParamListInfo boundParams); extern List *pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams); +extern void exec_simple_query(const char *query_string, int16 format); +extern void exec_parse_message(const char *query_string, const char *stmt_name, + Oid paramTypes[], int numParams); +extern void exec_bind_message(StringInfo input_message); +extern void exec_execute_message(const char *portal_name, long max_rows); +extern void exec_describe_statement_message(const char *stmt_name); extern bool check_max_stack_depth(int *newval, void **extra, GucSource source); extern void assign_max_stack_depth(int newval, void *extra); @@ -71,6 +77,10 @@ extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from S extern void ProcessClientReadInterrupt(bool blocked); extern void ProcessClientWriteInterrupt(bool blocked); +extern void start_xact_command(void); +extern void finish_xact_command(void); +extern void reset_xact_command(void); + extern void process_postgres_switches(int argc, char *argv[], GucContext ctx, const char **dbname); extern void PostgresMain(int argc, char *argv[], diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile index 7680d49cb6..9895a6e869 100644 --- a/src/pl/plpython/Makefile +++ b/src/pl/plpython/Makefile @@ -20,6 +20,7 @@ PGFILEDESC = "PL/Python - procedural language" NAME = plpython$(python_majorversion) OBJS = \ + plpy_bgsession.o \ plpy_cursorobject.o \ plpy_elog.o \ plpy_exec.o \ @@ -89,6 +90,7 @@ REGRESS = \ plpython_quote \ plpython_composite \ plpython_subtransaction \ + plpython_bgsession \ plpython_drop REGRESS_PLPYTHON3_MANGLE := $(REGRESS) diff --git a/src/pl/plpython/expected/plpython_bgsession.out b/src/pl/plpython/expected/plpython_bgsession.out new file mode 100644 index 0000000000..5b32eab31b --- /dev/null +++ b/src/pl/plpython/expected/plpython_bgsession.out @@ -0,0 +1,217 @@ +CREATE TABLE test1 (a int, b text); +-- test of independent commit/rollback +CREATE FUNCTION bgsession_test1() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + for i in range(0, 10): + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + a.execute("COMMIT") + else: + a.execute("ROLLBACK") +$$; +SELECT bgsession_test1(); + bgsession_test1 +----------------- + +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +-- test query result +CREATE FUNCTION bgsession_test2() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (11)") + rv = a.execute("SELECT * FROM test1") + plpy.info(rv) + a.execute("ROLLBACK") +$$; +SELECT bgsession_test2(); +INFO: <PLyResult status=0 nrows=6 rows=[{'a': 0, 'b': None}, {'a': 2, 'b': None}, {'a': 4, 'b': None}, {'a': 6, 'b': None}, {'a': 8, 'b': None}, {'a': 11, 'b': None}]> + bgsession_test2 +----------------- + +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +-- test notice and error forwarding +CREATE FUNCTION bgsession_test3() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$") + a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$") +$$; +SELECT bgsession_test3(); +NOTICE: notice +ERROR: plpy.Error: error +CONTEXT: Traceback (most recent call last): + PL/Python function "bgsession_test3", line 4, in <module> + a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$") +PL/Python function "bgsession_test3" +-- test prohibit changing client encoding +CREATE FUNCTION bgsession_test4() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("SET client_encoding TO SJIS") +$$; +SELECT bgsession_test4(); +ERROR: plpy.Error: cannot set client encoding in background session +CONTEXT: Traceback (most recent call last): + PL/Python function "bgsession_test4", line 3, in <module> + a.execute("SET client_encoding TO SJIS") +PL/Python function "bgsession_test4" +-- test prepared statements +CREATE FUNCTION bgsession_test5() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"]) + a.execute_prepared(plan, [1, "one"]) + a.execute_prepared(plan, [2, "two"]) +$$; +TRUNCATE test1; +SELECT bgsession_test5(); + bgsession_test5 +----------------- + +(1 row) + +SELECT * FROM test1; + a | b +---+----- + 1 | one + 2 | two +(2 rows) + +-- test result from prepared query +CREATE FUNCTION bgsession_test7() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"]) + a.execute_prepared(plan, [11]) + plan = a.prepare("SELECT * FROM test1") + rv = a.execute_prepared(plan, []) + plpy.info(rv) + a.execute("ROLLBACK") +$$; +TRUNCATE test1; +SELECT bgsession_test7(); +INFO: <PLyResult status=0 nrows=1 rows=[{'a': 11, 'b': None}]> + bgsession_test7 +----------------- + +(1 row) + +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +-- test error when not closing transaction +CREATE FUNCTION bgsession_test8() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") +$$; +SELECT bgsession_test8(); +ERROR: background session ended with transaction block open +CONTEXT: PL/Python function "bgsession_test8" +-- test saving session +CREATE FUNCTION bgsession_test9a() RETURNS integer +LANGUAGE plpythonu +AS $$ +bg = plpy.BackgroundSession() +GD['bg'] = bg +bg.execute("BEGIN") +bg.execute("INSERT INTO test1 VALUES (1)") + +return 1 +$$; +CREATE FUNCTION bgsession_test9b() RETURNS integer +LANGUAGE plpythonu +AS $$ +bg = GD['bg'] +bg.execute("INSERT INTO test1 VALUES (2)") +bg.execute("COMMIT") +bg.close() + +return 2 +$$; +TRUNCATE test1; +SELECT bgsession_test9a(); + bgsession_test9a +------------------ + 1 +(1 row) + +SELECT bgsession_test9b(); + bgsession_test9b +------------------ + 2 +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 1 | + 2 | +(2 rows) + +-- test error handling +CREATE FUNCTION bgsession_test10() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as bg: + try: + bg.execute("SELECT error") + except Exception, ex: + plpy.notice("caught exception: %s" % ex) + + try: + bg.prepare("SELECT error") + except Exception, ex: + plpy.notice("caught exception: %s" % ex) + + try: + plan = bg.prepare("SELECT $1", ["int4"]) + bg.execute_prepared(plan, ["foo"]) + except plpy.Error, ex: + plpy.notice("caught exception: %s" % ex) +$$; +SELECT bgsession_test10(); +NOTICE: caught exception: column "error" does not exist +NOTICE: caught exception: column "error" does not exist +NOTICE: caught exception: invalid input syntax for integer: "foo" + bgsession_test10 +------------------ + +(1 row) + +DROP TABLE test1; diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index 847e4cc412..2001b60fa4 100644 --- a/src/pl/plpython/expected/plpython_test.out +++ b/src/pl/plpython/expected/plpython_test.out @@ -43,8 +43,9 @@ contents.sort() return contents $$ LANGUAGE plpythonu; select module_contents(); - module_contents ------------------ + module_contents +------------------- + BackgroundSession Error Fatal SPIError @@ -63,7 +64,7 @@ select module_contents(); spiexceptions subtransaction warning -(18 rows) +(19 rows) CREATE FUNCTION elog_test_basic() RETURNS void AS $$ diff --git a/src/pl/plpython/plpy_bgsession.c b/src/pl/plpython/plpy_bgsession.c new file mode 100644 index 0000000000..47d299f73c --- /dev/null +++ b/src/pl/plpython/plpy_bgsession.c @@ -0,0 +1,509 @@ +/* + * the PLyBackgroundSession class + * + * src/pl/plpython/plpy_bgsession.c + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "mb/pg_wchar.h" +#include "parser/parse_type.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + +#include "plpython.h" + +#include "plpy_bgsession.h" + +#include "plpy_elog.h" +#include "plpy_main.h" +#include "plpy_planobject.h" +#include "plpy_spi.h" + + +static PyObject *PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw); +static void PLyBackgroundSession_dealloc(PyObject *subxact); +static PyObject *PLyBackgroundSession_enter(PyObject *self, PyObject *unused); +static PyObject *PLyBackgroundSession_close(PyObject *self, PyObject *args); +static PyObject *PLyBackgroundSession_execute(PyObject *self, PyObject *args); +static PyObject *PLyBackgroundSession_prepare(PyObject *self, PyObject *args); +static PyObject *PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args); + +static char PLyBackgroundSession_doc[] = { + "PostgreSQL background session context manager" +}; + +static PyMethodDef PLyBackgroundSession_methods[] = { + {"close", PLyBackgroundSession_close, METH_VARARGS, NULL}, + {"__enter__", PLyBackgroundSession_enter, METH_VARARGS, NULL}, + {"__exit__", PLyBackgroundSession_close, METH_VARARGS, NULL}, + {"execute", PLyBackgroundSession_execute, METH_VARARGS, NULL}, + {"prepare", PLyBackgroundSession_prepare, METH_VARARGS, NULL}, + {"execute_prepared", PLyBackgroundSession_execute_prepared, METH_VARARGS, NULL}, + {NULL, NULL, 0, NULL} +}; + +static PyTypeObject PLyBackgroundSession_Type = { + PyVarObject_HEAD_INIT(NULL, 0) + "plpy.BackgroundSession", /* tp_name */ + sizeof(PLyBackgroundSession_Object), /* tp_size */ + 0, /* tp_itemsize */ + + /* + * methods + */ + PLyBackgroundSession_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */ + PLyBackgroundSession_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + PLyBackgroundSession_methods, /* tp_tpmethods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + PLyBackgroundSession_new, /* tp_new */ + 0, /* tp_free */ +}; + + +int +PLy_bgsession_init_type(PyObject *module) +{ + if (PyType_Ready(&PLyBackgroundSession_Type) < 0) + return -1; + + Py_INCREF(&PLyBackgroundSession_Type); + if (PyModule_AddObject(module, "BackgroundSession", (PyObject *)&PLyBackgroundSession_Type) < 0) + return -1; + + return 0; +} + +static PyObject * +PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw) +{ + PyObject *result = type->tp_alloc(type, 0); + PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) result; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + bgsession->bgsession = BackgroundSessionStart(); + MemoryContextSwitchTo(oldcontext); + + return result; +} + +/* + * Python requires a dealloc function to be defined + */ +static void +PLyBackgroundSession_dealloc(PyObject *self) +{ +} + +/* + * bgsession.__enter__() or bgsession.enter() + */ +static PyObject * +PLyBackgroundSession_enter(PyObject *self, PyObject *unused) +{ + Py_INCREF(self); + return self; +} + +/* + * bgsession.close() or bgsession.__exit__(exc_type, exc, tb) + */ +static PyObject * +PLyBackgroundSession_close(PyObject *self, PyObject *args) +{ + PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self; + + if (!bgsession->bgsession) + { + PLy_exception_set(PyExc_ValueError, "this background session has already been closed"); + return NULL; + } + + BackgroundSessionEnd(bgsession->bgsession); + bgsession->bgsession = NULL; + + Py_INCREF(Py_None); + return Py_None; +} + +/* + * Use in PG_CATCH() clause to set Python exception from error data. + */ +static void +PLy_bgsession_handle_error(MemoryContext oldcontext) +{ + ErrorData *edata; + + MemoryContextSwitchTo(oldcontext); + edata = CopyErrorData(); + FlushErrorState(); + PLy_exception_set_with_details(PLy_exc_error, edata); + FreeErrorData(edata); +} + +static PyObject * +PLyBackgroundSession_execute(PyObject *self, PyObject *args) +{ + PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self; + char *query; + volatile MemoryContext oldcontext; + PyObject *ret; + + if (!bgsession->bgsession) + { + PLy_exception_set(PyExc_ValueError, "this background session has already been closed"); + return NULL; + } + + if (!PyArg_ParseTuple(args, "s:execute", &query)) + { + PLy_exception_set(PLy_exc_error, "background session execute expected a query"); + return NULL; + } + + oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + BackgroundSessionResult *result; + + pg_verifymbstr(query, strlen(query), false); + result = BackgroundSessionExecute(bgsession->bgsession, query); + ret = PLy_execute_fetch_result(result->tuples, result->tupdesc, result->ntuples, 0); + } + PG_CATCH(); + { + PLy_bgsession_handle_error(oldcontext); + return NULL; + } + PG_END_TRY(); + + return ret; +} + +// XXX lots of overlap with PLy_spi_prepare +static PyObject * +PLyBackgroundSession_prepare(PyObject *self, PyObject *args) +{ + PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self; + char *query; + PyObject *paraminfo = NULL; + int nargs = 0; + const char **argnames = NULL; + PyObject *keys; + PLyPlanObject *plan; + volatile MemoryContext oldcontext; + PyObject *volatile optr = NULL; + + if (!bgsession->bgsession) + { + PLy_exception_set(PyExc_ValueError, "this background session has already been closed"); + return NULL; + } + + if (!PyArg_ParseTuple(args, "s|O:prepare", &query, ¶minfo)) + return NULL; + + if (paraminfo && + !PySequence_Check(paraminfo) && !PyMapping_Check(paraminfo)) + { + PLy_exception_set(PyExc_TypeError, + "second argument of prepare must be a sequence or mapping"); + return NULL; + } + + if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL) + return NULL; + + plan->mcxt = AllocSetContextCreate(TopMemoryContext, + "PL/Python background plan context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + oldcontext = MemoryContextSwitchTo(plan->mcxt); + + if (!paraminfo) + nargs = 0; + else if (PySequence_Check(paraminfo)) + nargs = PySequence_Length(paraminfo); + else + nargs = PyMapping_Length(paraminfo); + + plan->nargs = nargs; + plan->types = nargs ? palloc(sizeof(Oid) * nargs) : NULL; + plan->values = nargs ? palloc(sizeof(Datum) * nargs) : NULL; + plan->args = nargs ? palloc(sizeof(PLyTypeInfo) * nargs) : NULL; + + if (PyMapping_Check(paraminfo)) + { + argnames = palloc(nargs * sizeof(char *)); + keys = PyMapping_Keys(paraminfo); + } + else + { + argnames = NULL; + keys = NULL; + } + + MemoryContextSwitchTo(oldcontext); + + oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + int i; + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + + for (i = 0; i < nargs; i++) + { + PLy_typeinfo_init(&plan->args[i], plan->mcxt); + plan->values[i] = PointerGetDatum(NULL); + } + + for (i = 0; i < nargs; i++) + { + char *sptr; + HeapTuple typeTup; + Oid typeId; + int32 typmod; + + if (keys) + { + PyObject *key; + char *keystr; + + key = PySequence_GetItem(keys, i); + argnames[i] = keystr = PyString_AsString(key); + optr = PyMapping_GetItemString(paraminfo, keystr); + Py_DECREF(key); + } + else + optr = PySequence_GetItem(paraminfo, i); + + if (PyString_Check(optr)) + sptr = PyString_AsString(optr); + else if (PyUnicode_Check(optr)) + sptr = PLyUnicode_AsString(optr); + else + { + ereport(ERROR, + (errmsg("background session prepare: type name at ordinal position %d is not a string", i))); + sptr = NULL; /* keep compiler quiet */ + } + + /******************************************************** + * Resolve argument type names and then look them up by + * oid in the system cache, and remember the required + *information for input conversion. + ********************************************************/ + + parseTypeString(sptr, &typeId, &typmod, false); + + typeTup = SearchSysCache1(TYPEOID, + ObjectIdGetDatum(typeId)); + if (!HeapTupleIsValid(typeTup)) + elog(ERROR, "cache lookup failed for type %u", typeId); + + Py_DECREF(optr); + + /* + * set optr to NULL, so we won't try to unref it again in case of + * an error + */ + optr = NULL; + + plan->types[i] = typeId; + PLy_output_datum_func(&plan->args[i], typeTup, exec_ctx->curr_proc->langid, exec_ctx->curr_proc->trftypes); + ReleaseSysCache(typeTup); + } + + pg_verifymbstr(query, strlen(query), false); + plan->bgstmt = BackgroundSessionPrepare(bgsession->bgsession, query, nargs, plan->types, argnames); + } + PG_CATCH(); + { + Py_DECREF(plan); + Py_XDECREF(optr); + + PLy_bgsession_handle_error(oldcontext); + return NULL; + } + PG_END_TRY(); + + Assert(plan->bgstmt != NULL); + return (PyObject *) plan; +} + +static PyObject * +PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args) +{ + PLyBackgroundSession_Object *bgsession pg_attribute_unused() = (PLyBackgroundSession_Object *) self; + PyObject *ob; + PLyPlanObject *plan; + PyObject *list = NULL; + int nargs; + volatile MemoryContext oldcontext; + int i; + PyObject *ret; + + if (!bgsession->bgsession) + { + PLy_exception_set(PyExc_ValueError, "this background session has already been closed"); + return NULL; + } + + if (!PyArg_ParseTuple(args, "O|O:execute_prepared", &ob, &list)) + return NULL; + + if (!is_PLyPlanObject(ob)) + { + PLy_exception_set(PyExc_TypeError, + "first argument of execute_prepared must be a plan"); + return NULL; + } + + plan = (PLyPlanObject *) ob; + + if (list && (!PySequence_Check(list))) + { + PLy_exception_set(PyExc_TypeError, + "second argument of execute_prepared must be a sequence"); + return NULL; + } + + nargs = list ? PySequence_Length(list) : 0; + + if (nargs != plan->nargs) + { + char *sv; + PyObject *so = PyObject_Str(list); + + if (!so) + PLy_elog(ERROR, "could not execute plan"); + sv = PyString_AsString(so); + PLy_exception_set_plural(PyExc_TypeError, + "Expected sequence of %d argument, got %d: %s", + "Expected sequence of %d arguments, got %d: %s", + plan->nargs, + plan->nargs, nargs, sv); + Py_DECREF(so); + + return NULL; + } + + oldcontext = CurrentMemoryContext; + + PG_TRY(); + { + bool *nulls; + BackgroundSessionResult *result; + + nulls = palloc(nargs * sizeof(*nulls)); + + for (i = 0; i < nargs; i++) + { + PyObject *elem; + + elem = PySequence_GetItem(list, i); + if (elem != Py_None) + { + PG_TRY(); + { + plan->values[i] = + plan->args[i].out.d.func(&(plan->args[i].out.d), + -1, + elem, + false); + } + PG_CATCH(); + { + Py_DECREF(elem); + PG_RE_THROW(); + } + PG_END_TRY(); + + Py_DECREF(elem); + nulls[i] = false; + } + else + { + Py_DECREF(elem); + plan->values[i] = + InputFunctionCall(&(plan->args[i].out.d.typfunc), + NULL, + plan->args[i].out.d.typioparam, + -1); + nulls[i] = true; + } + } + + result = BackgroundSessionExecutePrepared(plan->bgstmt, nargs, plan->values, nulls); + pfree(nulls); + ret = PLy_execute_fetch_result(result->tuples, result->tupdesc, result->ntuples, 0); + } + PG_CATCH(); + { + int k; + + /* + * cleanup plan->values array + */ + for (k = 0; k < nargs; k++) + { + if (!plan->args[k].out.d.typbyval && + (plan->values[k] != PointerGetDatum(NULL))) + { + pfree(DatumGetPointer(plan->values[k])); + plan->values[k] = PointerGetDatum(NULL); + } + } + + PLy_bgsession_handle_error(oldcontext); + return NULL; + } + PG_END_TRY(); + + for (i = 0; i < nargs; i++) + { + if (!plan->args[i].out.d.typbyval && + (plan->values[i] != PointerGetDatum(NULL))) + { + pfree(DatumGetPointer(plan->values[i])); + plan->values[i] = PointerGetDatum(NULL); + } + } + + return ret; +} diff --git a/src/pl/plpython/plpy_bgsession.h b/src/pl/plpython/plpy_bgsession.h new file mode 100644 index 0000000000..39daca2d39 --- /dev/null +++ b/src/pl/plpython/plpy_bgsession.h @@ -0,0 +1,18 @@ +/* + * src/pl/plpython/plpy_bgsession.h + */ + +#ifndef PLPY_BGSESSION_H +#define PLPY_BGSESSION_H + +#include "tcop/bgsession.h" + +typedef struct PLyBackgroundSession_Object +{ + PyObject_HEAD + BackgroundSession *bgsession; +} PLyBackgroundSession_Object; + +extern int PLy_bgsession_init_type(PyObject *module); + +#endif /* PLPY_BGSESSION_H */ diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h index 10426c4323..b7ee34c3f2 100644 --- a/src/pl/plpython/plpy_main.h +++ b/src/pl/plpython/plpy_main.h @@ -7,6 +7,8 @@ #include "plpy_procedure.h" +#include "tcop/bgsession.h" + /* the interpreter's globals dict */ extern PyObject *PLy_interp_globals; @@ -19,6 +21,7 @@ typedef struct PLyExecutionContext { PLyProcedure *curr_proc; /* the currently executing procedure */ MemoryContext scratch_ctx; /* a context for things like type I/O */ + BackgroundSession *bgsession; struct PLyExecutionContext *next; /* previous stack level */ } PLyExecutionContext; diff --git a/src/pl/plpython/plpy_planobject.c b/src/pl/plpython/plpy_planobject.c index 16c39a05dd..006a3afd80 100644 --- a/src/pl/plpython/plpy_planobject.c +++ b/src/pl/plpython/plpy_planobject.c @@ -77,6 +77,7 @@ PLy_plan_new(void) return NULL; ob->plan = NULL; + ob->bgstmt = NULL; ob->nargs = 0; ob->types = NULL; ob->values = NULL; diff --git a/src/pl/plpython/plpy_planobject.h b/src/pl/plpython/plpy_planobject.h index c67559266e..fba814ad8a 100644 --- a/src/pl/plpython/plpy_planobject.h +++ b/src/pl/plpython/plpy_planobject.h @@ -6,6 +6,7 @@ #define PLPY_PLANOBJECT_H #include "executor/spi.h" +#include "tcop/bgsession.h" #include "plpy_typeio.h" @@ -13,6 +14,7 @@ typedef struct PLyPlanObject { PyObject_HEAD SPIPlanPtr plan; + BackgroundSessionPreparedStatement *bgstmt; int nargs; Oid *types; Datum *values; diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c index 761534406d..500c0931f9 100644 --- a/src/pl/plpython/plpy_plpymodule.c +++ b/src/pl/plpython/plpy_plpymodule.c @@ -13,6 +13,7 @@ #include "plpy_plpymodule.h" +#include "plpy_bgsession.h" #include "plpy_cursorobject.h" #include "plpy_elog.h" #include "plpy_planobject.h" @@ -137,6 +138,8 @@ PyInit_plpy(void) return NULL; PLy_add_exceptions(m); + if (PLy_bgsession_init_type(plpy) < 0) + return NULL; return m; } @@ -167,6 +170,8 @@ PLy_init_plpy(void) #else plpy = Py_InitModule("plpy", PLy_methods); PLy_add_exceptions(plpy); + if (PLy_bgsession_init_type(plpy) < 0) + PLy_elog(ERROR, "could not initialize BackgroundSession type"); #endif /* PyDict_SetItemString(plpy, "PlanType", (PyObject *) &PLy_PlanType); */ diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c index 07ab6a087e..97b4ba05f9 100644 --- a/src/pl/plpython/plpy_spi.c +++ b/src/pl/plpython/plpy_spi.c @@ -31,8 +31,6 @@ static PyObject *PLy_spi_execute_query(char *query, long limit); static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit); -static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, - uint64 rows, int status); static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata); @@ -291,7 +289,10 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) rv = SPI_execute_plan(plan->plan, plan->values, nulls, exec_ctx->curr_proc->fn_readonly, limit); - ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); + ret = PLy_execute_fetch_result(SPI_tuptable ? SPI_tuptable->vals : NULL, + SPI_tuptable ? SPI_tuptable->tupdesc : NULL, + SPI_processed, rv); + SPI_freetuptable(SPI_tuptable); if (nargs > 0) pfree(nulls); @@ -360,7 +361,10 @@ PLy_spi_execute_query(char *query, long limit) pg_verifymbstr(query, strlen(query), false); rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit); - ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); + ret = PLy_execute_fetch_result(SPI_tuptable ? SPI_tuptable->vals : NULL, + SPI_tuptable ? SPI_tuptable->tupdesc : 0, + SPI_processed, rv); + SPI_freetuptable(SPI_tuptable); PLy_spi_subtransaction_commit(oldcontext, oldowner); } @@ -383,8 +387,8 @@ PLy_spi_execute_query(char *query, long limit) return ret; } -static PyObject * -PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) +PyObject * +PLy_execute_fetch_result(HeapTuple *tuples, TupleDesc tupdesc, uint64 rows, int status) { PLyResultObject *result; volatile MemoryContext oldcontext; @@ -393,14 +397,14 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) Py_DECREF(result->status); result->status = PyInt_FromLong(status); - if (status > 0 && tuptable == NULL) + if (status >= 0 && tuples == NULL) { Py_DECREF(result->nrows); result->nrows = (rows > (uint64) LONG_MAX) ? PyFloat_FromDouble((double) rows) : PyInt_FromLong((long) rows); } - else if (status > 0 && tuptable != NULL) + else if (status >= 0 && tuples != NULL) { PLyTypeInfo args; MemoryContext cxt; @@ -437,12 +441,12 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) Py_DECREF(result->rows); result->rows = PyList_New(rows); - PLy_input_tuple_funcs(&args, tuptable->tupdesc); + PLy_input_tuple_funcs(&args, tupdesc); for (i = 0; i < rows; i++) { PyObject *row = PLyDict_FromTuple(&args, - tuptable->vals[i], - tuptable->tupdesc); + tuples[i], + tupdesc); PyList_SetItem(result->rows, i, row); } @@ -457,7 +461,7 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) * leaked due to errors.) */ oldcontext2 = MemoryContextSwitchTo(TopMemoryContext); - result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc); + result->tupdesc = CreateTupleDescCopy(tupdesc); MemoryContextSwitchTo(oldcontext2); } PG_CATCH(); @@ -470,7 +474,6 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) PG_END_TRY(); MemoryContextDelete(cxt); - SPI_freetuptable(tuptable); } return (PyObject *) result; diff --git a/src/pl/plpython/plpy_spi.h b/src/pl/plpython/plpy_spi.h index b0427947ef..268c539838 100644 --- a/src/pl/plpython/plpy_spi.h +++ b/src/pl/plpython/plpy_spi.h @@ -5,12 +5,15 @@ #ifndef PLPY_SPI_H #define PLPY_SPI_H +#include "executor/spi.h" #include "utils/palloc.h" #include "utils/resowner.h" extern PyObject *PLy_spi_prepare(PyObject *self, PyObject *args); extern PyObject *PLy_spi_execute(PyObject *self, PyObject *args); +extern PyObject *PLy_execute_fetch_result(HeapTuple *tuples, TupleDesc tupdesc, uint64 rows, int status); + typedef struct PLyExceptionEntry { int sqlstate; /* hash key, must be first */ diff --git a/src/pl/plpython/sql/plpython_bgsession.sql b/src/pl/plpython/sql/plpython_bgsession.sql new file mode 100644 index 0000000000..7a801ab727 --- /dev/null +++ b/src/pl/plpython/sql/plpython_bgsession.sql @@ -0,0 +1,177 @@ +CREATE TABLE test1 (a int, b text); + + +-- test of independent commit/rollback + +CREATE FUNCTION bgsession_test1() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + for i in range(0, 10): + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + a.execute("COMMIT") + else: + a.execute("ROLLBACK") +$$; + +SELECT bgsession_test1(); + +SELECT * FROM test1; + + +-- test query result + +CREATE FUNCTION bgsession_test2() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (11)") + rv = a.execute("SELECT * FROM test1") + plpy.info(rv) + a.execute("ROLLBACK") +$$; + +SELECT bgsession_test2(); + +SELECT * FROM test1; + + +-- test notice and error forwarding + +CREATE FUNCTION bgsession_test3() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$") + a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$") +$$; + +SELECT bgsession_test3(); + + +-- test prohibit changing client encoding + +CREATE FUNCTION bgsession_test4() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("SET client_encoding TO SJIS") +$$; + +SELECT bgsession_test4(); + + +-- test prepared statements + +CREATE FUNCTION bgsession_test5() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"]) + a.execute_prepared(plan, [1, "one"]) + a.execute_prepared(plan, [2, "two"]) +$$; + +TRUNCATE test1; + +SELECT bgsession_test5(); + +SELECT * FROM test1; + + +-- test result from prepared query + +CREATE FUNCTION bgsession_test7() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"]) + a.execute_prepared(plan, [11]) + plan = a.prepare("SELECT * FROM test1") + rv = a.execute_prepared(plan, []) + plpy.info(rv) + a.execute("ROLLBACK") +$$; + +TRUNCATE test1; + +SELECT bgsession_test7(); + +SELECT * FROM test1; + + +-- test error when not closing transaction + +CREATE FUNCTION bgsession_test8() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") +$$; + +SELECT bgsession_test8(); + + +-- test saving session + +CREATE FUNCTION bgsession_test9a() RETURNS integer +LANGUAGE plpythonu +AS $$ +bg = plpy.BackgroundSession() +GD['bg'] = bg +bg.execute("BEGIN") +bg.execute("INSERT INTO test1 VALUES (1)") + +return 1 +$$; + +CREATE FUNCTION bgsession_test9b() RETURNS integer +LANGUAGE plpythonu +AS $$ +bg = GD['bg'] +bg.execute("INSERT INTO test1 VALUES (2)") +bg.execute("COMMIT") +bg.close() + +return 2 +$$; + +TRUNCATE test1; + +SELECT bgsession_test9a(); +SELECT bgsession_test9b(); + +SELECT * FROM test1; + + +-- test error handling + +CREATE FUNCTION bgsession_test10() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as bg: + try: + bg.execute("SELECT error") + except Exception, ex: + plpy.notice("caught exception: %s" % ex) + + try: + bg.prepare("SELECT error") + except Exception, ex: + plpy.notice("caught exception: %s" % ex) + + try: + plan = bg.prepare("SELECT $1", ["int4"]) + bg.execute_prepared(plan, ["foo"]) + except plpy.Error, ex: + plpy.notice("caught exception: %s" % ex) +$$; + +SELECT bgsession_test10(); + + +DROP TABLE test1; -- 2.12.0
>From 2e1fc0b0c50452bac91461a2317c28a8718fe89f Mon Sep 17 00:00:00 2001 From: Peter Eisentraut <pete...@gmx.net> Date: Sun, 25 Dec 2016 12:00:00 -0500 Subject: [PATCH v2 1/2] dblink: Replace some macros by static functions Also remove some unused code and the no longer useful dblink.h file. --- contrib/dblink/dblink.c | 290 +++++++++++++++++++++++------------------------- contrib/dblink/dblink.h | 39 ------- 2 files changed, 137 insertions(+), 192 deletions(-) delete mode 100644 contrib/dblink/dblink.h diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index e0d6778a08..67d6699066 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -61,8 +61,6 @@ #include "utils/tqual.h" #include "utils/varlena.h" -#include "dblink.h" - PG_MODULE_MAGIC; typedef struct remoteConn @@ -146,98 +144,102 @@ typedef struct remoteConnHashEnt /* initial number of connection hashes */ #define NUMCONN 16 -/* general utility */ -#define xpfree(var_) \ - do { \ - if (var_ != NULL) \ - { \ - pfree(var_); \ - var_ = NULL; \ - } \ - } while (0) - -#define xpstrdup(var_c, var_) \ - do { \ - if (var_ != NULL) \ - var_c = pstrdup(var_); \ - else \ - var_c = NULL; \ - } while (0) - -#define DBLINK_RES_INTERNALERROR(p2) \ - do { \ - msg = pchomp(PQerrorMessage(conn)); \ - if (res) \ - PQclear(res); \ - elog(ERROR, "%s: %s", p2, msg); \ - } while (0) - -#define DBLINK_CONN_NOT_AVAIL \ - do { \ - if(conname) \ - ereport(ERROR, \ - (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \ - errmsg("connection \"%s\" not available", conname))); \ - else \ - ereport(ERROR, \ - (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), \ - errmsg("connection not available"))); \ - } while (0) - -#define DBLINK_GET_CONN \ - do { \ - char *conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); \ - rconn = getConnectionByName(conname_or_str); \ - if (rconn) \ - { \ - conn = rconn->conn; \ - conname = conname_or_str; \ - } \ - else \ - { \ - connstr = get_connect_string(conname_or_str); \ - if (connstr == NULL) \ - { \ - connstr = conname_or_str; \ - } \ - dblink_connstr_check(connstr); \ - conn = PQconnectdb(connstr); \ - if (PQstatus(conn) == CONNECTION_BAD) \ - { \ - msg = pchomp(PQerrorMessage(conn)); \ - PQfinish(conn); \ - ereport(ERROR, \ - (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), \ - errmsg("could not establish connection"), \ - errdetail_internal("%s", msg))); \ - } \ - dblink_security_check(conn, rconn); \ - if (PQclientEncoding(conn) != GetDatabaseEncoding()) \ - PQsetClientEncoding(conn, GetDatabaseEncodingName()); \ - freeconn = true; \ - } \ - } while (0) - -#define DBLINK_GET_NAMED_CONN \ - do { \ - conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); \ - rconn = getConnectionByName(conname); \ - if (rconn) \ - conn = rconn->conn; \ - else \ - DBLINK_CONN_NOT_AVAIL; \ - } while (0) - -#define DBLINK_INIT \ - do { \ - if (!pconn) \ - { \ - pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); \ - pconn->conn = NULL; \ - pconn->openCursorCount = 0; \ - pconn->newXactForCursor = FALSE; \ - } \ - } while (0) +static char * +xpstrdup(const char *in) +{ + if (in == NULL) + return NULL; + return pstrdup(in); +} + +static void +dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2) +{ + char *msg = pchomp(PQerrorMessage(conn)); + if (res) + PQclear(res); + elog(ERROR, "%s: %s", p2, msg); +} + +static void pg_attribute_noreturn() +dblink_conn_not_avail(const char *conname) +{ + if (conname) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("connection \"%s\" not available", conname))); + else + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST), + errmsg("connection not available"))); +} + +static void +dblink_get_conn(char *conname_or_str, + PGconn * volatile *conn_p, char **conname_p, volatile bool *freeconn_p) +{ + remoteConn *rconn = getConnectionByName(conname_or_str); + PGconn *conn; + char *conname; + bool freeconn; + + if (rconn) + { + conn = rconn->conn; + conname = conname_or_str; + freeconn = false; + } + else + { + const char *connstr; + + connstr = get_connect_string(conname_or_str); + if (connstr == NULL) + connstr = conname_or_str; + dblink_connstr_check(connstr); + conn = PQconnectdb(connstr); + if (PQstatus(conn) == CONNECTION_BAD) + { + char *msg = pchomp(PQerrorMessage(conn)); + PQfinish(conn); + ereport(ERROR, + (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION), + errmsg("could not establish connection"), + errdetail_internal("%s", msg))); + } + dblink_security_check(conn, rconn); + if (PQclientEncoding(conn) != GetDatabaseEncoding()) + PQsetClientEncoding(conn, GetDatabaseEncodingName()); + freeconn = true; + conname = NULL; + } + + *conn_p = conn; + *conname_p = conname; + *freeconn_p = freeconn; +} + +static PGconn * +dblink_get_named_conn(const char *conname) +{ + remoteConn *rconn = getConnectionByName(conname); + if (rconn) + return rconn->conn; + else + dblink_conn_not_avail(conname); +} + +static void +dblink_init(void) +{ + if (!pconn) + { + pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); + pconn->conn = NULL; + pconn->openCursorCount = 0; + pconn->newXactForCursor = FALSE; + } +} /* * Create a persistent connection to another database @@ -253,7 +255,7 @@ dblink_connect(PG_FUNCTION_ARGS) PGconn *conn = NULL; remoteConn *rconn = NULL; - DBLINK_INIT; + dblink_init(); if (PG_NARGS() == 2) { @@ -318,7 +320,7 @@ dblink_disconnect(PG_FUNCTION_ARGS) remoteConn *rconn = NULL; PGconn *conn = NULL; - DBLINK_INIT; + dblink_init(); if (PG_NARGS() == 1) { @@ -331,7 +333,7 @@ dblink_disconnect(PG_FUNCTION_ARGS) conn = pconn->conn; if (!conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); PQfinish(conn); if (rconn) @@ -352,7 +354,6 @@ PG_FUNCTION_INFO_V1(dblink_open); Datum dblink_open(PG_FUNCTION_ARGS) { - char *msg; PGresult *res = NULL; PGconn *conn = NULL; char *curname = NULL; @@ -362,7 +363,7 @@ dblink_open(PG_FUNCTION_ARGS) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ - DBLINK_INIT; + dblink_init(); initStringInfo(&buf); if (PG_NARGS() == 2) @@ -401,7 +402,7 @@ dblink_open(PG_FUNCTION_ARGS) } if (!rconn || !rconn->conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); else conn = rconn->conn; @@ -410,7 +411,7 @@ dblink_open(PG_FUNCTION_ARGS) { res = PQexec(conn, "BEGIN"); if (PQresultStatus(res) != PGRES_COMMAND_OK) - DBLINK_RES_INTERNALERROR("begin error"); + dblink_res_internalerror(conn, res, "begin error"); PQclear(res); rconn->newXactForCursor = TRUE; @@ -450,11 +451,10 @@ dblink_close(PG_FUNCTION_ARGS) char *curname = NULL; char *conname = NULL; StringInfoData buf; - char *msg; remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ - DBLINK_INIT; + dblink_init(); initStringInfo(&buf); if (PG_NARGS() == 1) @@ -489,7 +489,7 @@ dblink_close(PG_FUNCTION_ARGS) } if (!rconn || !rconn->conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); else conn = rconn->conn; @@ -517,7 +517,7 @@ dblink_close(PG_FUNCTION_ARGS) res = PQexec(conn, "COMMIT"); if (PQresultStatus(res) != PGRES_COMMAND_OK) - DBLINK_RES_INTERNALERROR("commit error"); + dblink_res_internalerror(conn, res, "commit error"); PQclear(res); } } @@ -543,7 +543,7 @@ dblink_fetch(PG_FUNCTION_ARGS) prepTuplestoreResult(fcinfo); - DBLINK_INIT; + dblink_init(); if (PG_NARGS() == 4) { @@ -587,7 +587,7 @@ dblink_fetch(PG_FUNCTION_ARGS) } if (!conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); initStringInfo(&buf); appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); @@ -633,15 +633,13 @@ PG_FUNCTION_INFO_V1(dblink_send_query); Datum dblink_send_query(PG_FUNCTION_ARGS) { - char *conname = NULL; - PGconn *conn = NULL; - char *sql = NULL; - remoteConn *rconn = NULL; + PGconn *conn; + char *sql; int retval; if (PG_NARGS() == 2) { - DBLINK_GET_NAMED_CONN; + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } else @@ -671,15 +669,12 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) prepTuplestoreResult(fcinfo); - DBLINK_INIT; + dblink_init(); PG_TRY(); { - char *msg; - char *connstr = NULL; char *sql = NULL; char *conname = NULL; - remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible */ if (!is_async) @@ -687,7 +682,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) if (PG_NARGS() == 3) { /* text,text,bool */ - DBLINK_GET_CONN; + dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); fail = PG_GETARG_BOOL(2); } @@ -702,7 +697,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) } else { - DBLINK_GET_CONN; + dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } } @@ -722,13 +717,13 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) if (PG_NARGS() == 2) { /* text,bool */ - DBLINK_GET_NAMED_CONN; + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); fail = PG_GETARG_BOOL(1); } else if (PG_NARGS() == 1) { /* text */ - DBLINK_GET_NAMED_CONN; + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); } else /* shouldn't happen */ @@ -736,7 +731,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) } if (!conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); if (!is_async) { @@ -1297,12 +1292,10 @@ PG_FUNCTION_INFO_V1(dblink_is_busy); Datum dblink_is_busy(PG_FUNCTION_ARGS) { - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; + PGconn *conn; - DBLINK_INIT; - DBLINK_GET_NAMED_CONN; + dblink_init(); + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); PQconsumeInput(conn); PG_RETURN_INT32(PQisBusy(conn)); @@ -1323,15 +1316,13 @@ PG_FUNCTION_INFO_V1(dblink_cancel_query); Datum dblink_cancel_query(PG_FUNCTION_ARGS) { - int res = 0; - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; + int res; + PGconn *conn; PGcancel *cancel; char errbuf[256]; - DBLINK_INIT; - DBLINK_GET_NAMED_CONN; + dblink_init(); + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); cancel = PQgetCancel(conn); res = PQcancel(cancel, errbuf, 256); @@ -1359,12 +1350,10 @@ Datum dblink_error_message(PG_FUNCTION_ARGS) { char *msg; - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; + PGconn *conn; - DBLINK_INIT; - DBLINK_GET_NAMED_CONN; + dblink_init(); + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); msg = PQerrorMessage(conn); if (msg == NULL || msg[0] == '\0') @@ -1384,22 +1373,19 @@ dblink_exec(PG_FUNCTION_ARGS) PGconn *volatile conn = NULL; volatile bool freeconn = false; - DBLINK_INIT; + dblink_init(); PG_TRY(); { - char *msg; PGresult *res = NULL; - char *connstr = NULL; char *sql = NULL; char *conname = NULL; - remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ if (PG_NARGS() == 3) { /* must be text,text,bool */ - DBLINK_GET_CONN; + dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); fail = PG_GETARG_BOOL(2); } @@ -1414,7 +1400,7 @@ dblink_exec(PG_FUNCTION_ARGS) } else { - DBLINK_GET_CONN; + dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } } @@ -1429,7 +1415,7 @@ dblink_exec(PG_FUNCTION_ARGS) elog(ERROR, "wrong number of arguments"); if (!conn) - DBLINK_CONN_NOT_AVAIL; + dblink_conn_not_avail(conname); res = PQexec(conn, sql); if (!res || @@ -1880,9 +1866,7 @@ PG_FUNCTION_INFO_V1(dblink_get_notify); Datum dblink_get_notify(PG_FUNCTION_ARGS) { - char *conname = NULL; - PGconn *conn = NULL; - remoteConn *rconn = NULL; + PGconn *conn; PGnotify *notify; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; @@ -1892,9 +1876,9 @@ dblink_get_notify(PG_FUNCTION_ARGS) prepTuplestoreResult(fcinfo); - DBLINK_INIT; + dblink_init(); if (PG_NARGS() == 1) - DBLINK_GET_NAMED_CONN; + conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); else conn = pconn->conn; @@ -2698,10 +2682,10 @@ dblink_res_error(PGconn *conn, const char *conname, PGresult *res, else sqlstate = ERRCODE_CONNECTION_FAILURE; - xpstrdup(message_primary, pg_diag_message_primary); - xpstrdup(message_detail, pg_diag_message_detail); - xpstrdup(message_hint, pg_diag_message_hint); - xpstrdup(message_context, pg_diag_context); + message_primary = xpstrdup(pg_diag_message_primary); + message_detail = xpstrdup(pg_diag_message_detail); + message_hint = xpstrdup(pg_diag_message_hint); + message_context = xpstrdup(pg_diag_context); /* * If we don't get a message from the PGresult, try the PGconn. This diff --git a/contrib/dblink/dblink.h b/contrib/dblink/dblink.h deleted file mode 100644 index c96dbe28a8..0000000000 --- a/contrib/dblink/dblink.h +++ /dev/null @@ -1,39 +0,0 @@ -/* - * dblink.h - * - * Functions returning results from a remote database - * - * Joe Conway <m...@joeconway.com> - * And contributors: - * Darko Prenosil <darko.preno...@finteh.hr> - * Shridhar Daithankar <shridhar_daithan...@persistent.co.in> - * - * contrib/dblink/dblink.h - * Copyright (c) 2001-2017, PostgreSQL Global Development Group - * ALL RIGHTS RESERVED; - * - * Permission to use, copy, modify, and distribute this software and its - * documentation for any purpose, without fee, and without a written agreement - * is hereby granted, provided that the above copyright notice and this - * paragraph and the following two paragraphs appear in all copies. - * - * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR - * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING - * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS - * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE - * POSSIBILITY OF SUCH DAMAGE. - * - * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES, - * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY - * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS - * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO - * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. - * - */ - -#ifndef DBLINK_H -#define DBLINK_H - -#include "fmgr.h" - -#endif /* DBLINK_H */ -- 2.12.0
>From 16f77685f7701790191b4f8d9a3cbce60604d397 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut <pete...@gmx.net> Date: Tue, 28 Feb 2017 14:58:07 -0500 Subject: [PATCH v2 2/2] dblink: Add background sessions support Add function dblink_connect_self(), which creates a dblink connection via the background session API, instead of libpq. After that, it can be used transparently as a dblink connection, with some restrictions. This allows the typical use case of using dblink for self-connections without having to deal with access control issues of a libpq connection. --- contrib/dblink/Makefile | 1 + contrib/dblink/dblink--1.2--1.3.sql | 14 + contrib/dblink/dblink.c | 557 ++++++++++++++++++++++-------------- contrib/dblink/dblink.control | 2 +- contrib/dblink/expected/dblink.out | 95 ++++++ contrib/dblink/sql/dblink.sql | 20 ++ doc/src/sgml/dblink.sgml | 37 +++ 7 files changed, 516 insertions(+), 210 deletions(-) create mode 100644 contrib/dblink/dblink--1.2--1.3.sql diff --git a/contrib/dblink/Makefile b/contrib/dblink/Makefile index 5189758dab..106039da3b 100644 --- a/contrib/dblink/Makefile +++ b/contrib/dblink/Makefile @@ -7,6 +7,7 @@ SHLIB_LINK = $(libpq) EXTENSION = dblink DATA = dblink--1.2.sql dblink--1.1--1.2.sql dblink--1.0--1.1.sql \ + dblink--1.2--1.3.sql \ dblink--unpackaged--1.0.sql PGFILEDESC = "dblink - connect to other PostgreSQL databases" diff --git a/contrib/dblink/dblink--1.2--1.3.sql b/contrib/dblink/dblink--1.2--1.3.sql new file mode 100644 index 0000000000..b863613a3c --- /dev/null +++ b/contrib/dblink/dblink--1.2--1.3.sql @@ -0,0 +1,14 @@ +/* contrib/dblink/dblink--1.2--1.3.sql */ + +-- complain if script is sourced in psql, rather than via ALTER EXTENSION +\echo Use "ALTER EXTENSION dblink UPDATE TO '1.3'" to load this file. \quit + +CREATE FUNCTION dblink_connect_self () +RETURNS text +AS 'MODULE_PATHNAME','dblink_connect_self' +LANGUAGE C STRICT PARALLEL RESTRICTED; + +CREATE FUNCTION dblink_connect_self (text) +RETURNS text +AS 'MODULE_PATHNAME','dblink_connect_self' +LANGUAGE C STRICT PARALLEL RESTRICTED; diff --git a/contrib/dblink/dblink.c b/contrib/dblink/dblink.c index 67d6699066..6cb1a1e2e6 100644 --- a/contrib/dblink/dblink.c +++ b/contrib/dblink/dblink.c @@ -51,6 +51,7 @@ #include "mb/pg_wchar.h" #include "miscadmin.h" #include "parser/scansup.h" +#include "tcop/bgsession.h" #include "utils/acl.h" #include "utils/builtins.h" #include "utils/fmgroids.h" @@ -66,6 +67,7 @@ PG_MODULE_MAGIC; typedef struct remoteConn { PGconn *conn; /* Hold the remote connection */ + BackgroundSession *bgconn; int openCursorCount; /* The number of open cursors */ bool newXactForCursor; /* Opened a transaction for a cursor */ } remoteConn; @@ -89,6 +91,7 @@ static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async); static void prepTuplestoreResult(FunctionCallInfo fcinfo); static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res); +static void materializeBgResult(FunctionCallInfo fcinfo, BackgroundSession *bgconn, BackgroundSessionResult *res); static void materializeQueryResult(FunctionCallInfo fcinfo, PGconn *conn, const char *conname, @@ -111,7 +114,7 @@ static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumat static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode); static char *generate_relation_name(Relation rel); static void dblink_connstr_check(const char *connstr); -static void dblink_security_check(PGconn *conn, remoteConn *rconn); +static void dblink_security_check(remoteConn *rconn); static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res, const char *dblink_context_msg, bool fail); static char *get_connect_string(const char *servername); @@ -176,22 +179,21 @@ dblink_conn_not_avail(const char *conname) static void dblink_get_conn(char *conname_or_str, - PGconn * volatile *conn_p, char **conname_p, volatile bool *freeconn_p) + remoteConn * volatile *rconn_p, char **conname_p, volatile bool *freeconn_p) { remoteConn *rconn = getConnectionByName(conname_or_str); - PGconn *conn; char *conname; bool freeconn; if (rconn) { - conn = rconn->conn; conname = conname_or_str; freeconn = false; } else { const char *connstr; + PGconn *conn; connstr = get_connect_string(conname_or_str); if (connstr == NULL) @@ -207,38 +209,52 @@ dblink_get_conn(char *conname_or_str, errmsg("could not establish connection"), errdetail_internal("%s", msg))); } - dblink_security_check(conn, rconn); if (PQclientEncoding(conn) != GetDatabaseEncoding()) PQsetClientEncoding(conn, GetDatabaseEncodingName()); + rconn = palloc0(sizeof(*rconn)); + rconn->conn = conn; freeconn = true; conname = NULL; + dblink_security_check(rconn); } - *conn_p = conn; + *rconn_p = rconn; *conname_p = conname; *freeconn_p = freeconn; } -static PGconn * +static void +dblink_finish_conn(remoteConn *rconn) +{ + if (rconn->conn) + { + PQfinish(rconn->conn); + rconn->conn = NULL; + } + if (rconn->bgconn) + { + BackgroundSessionEnd(rconn->bgconn); + rconn->bgconn = NULL; + } +} + +static remoteConn * dblink_get_named_conn(const char *conname) { remoteConn *rconn = getConnectionByName(conname); if (rconn) - return rconn->conn; + return rconn; else dblink_conn_not_avail(conname); } static void -dblink_init(void) +dblink_bgsession_not_supported(const remoteConn *rconn) { - if (!pconn) - { - pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn)); - pconn->conn = NULL; - pconn->openCursorCount = 0; - pconn->newXactForCursor = FALSE; - } + if (!rconn->conn) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("not supported with background session connection"))); } /* @@ -255,8 +271,6 @@ dblink_connect(PG_FUNCTION_ARGS) PGconn *conn = NULL; remoteConn *rconn = NULL; - dblink_init(); - if (PG_NARGS() == 2) { conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1)); @@ -265,8 +279,7 @@ dblink_connect(PG_FUNCTION_ARGS) else if (PG_NARGS() == 1) conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0)); - if (connname) - rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, + rconn = (remoteConn *) MemoryContextAllocZero(TopMemoryContext, sizeof(remoteConn)); /* first check for valid foreign data server */ @@ -292,19 +305,46 @@ dblink_connect(PG_FUNCTION_ARGS) } /* check password actually used if not superuser */ - dblink_security_check(conn, rconn); + dblink_security_check(rconn); /* attempt to set client encoding to match server encoding, if needed */ if (PQclientEncoding(conn) != GetDatabaseEncoding()) PQsetClientEncoding(conn, GetDatabaseEncodingName()); + rconn->conn = conn; + if (connname) - { - rconn->conn = conn; createNewConnection(connname, rconn); - } else - pconn->conn = conn; + pconn = rconn; + + PG_RETURN_TEXT_P(cstring_to_text("OK")); +} + +PG_FUNCTION_INFO_V1(dblink_connect_self); +Datum +dblink_connect_self(PG_FUNCTION_ARGS) +{ + char *connname = NULL; + BackgroundSession *conn = NULL; + remoteConn *rconn = NULL; + MemoryContext oldcontext; + + if (PG_NARGS() == 1) + connname = text_to_cstring(PG_GETARG_TEXT_PP(0)); + + oldcontext = MemoryContextSwitchTo(TopMemoryContext); + conn = BackgroundSessionStart(); + MemoryContextSwitchTo(oldcontext); + + rconn = (remoteConn *) MemoryContextAllocZero(TopMemoryContext, + sizeof(remoteConn)); + rconn->bgconn = conn; + + if (connname) + createNewConnection(connname, rconn); + else + pconn = rconn; PG_RETURN_TEXT_P(cstring_to_text("OK")); } @@ -318,31 +358,25 @@ dblink_disconnect(PG_FUNCTION_ARGS) { char *conname = NULL; remoteConn *rconn = NULL; - PGconn *conn = NULL; - - dblink_init(); if (PG_NARGS() == 1) { conname = text_to_cstring(PG_GETARG_TEXT_PP(0)); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; } else - conn = pconn->conn; + rconn = pconn; - if (!conn) + if (!rconn) dblink_conn_not_avail(conname); - PQfinish(conn); - if (rconn) - { + dblink_finish_conn(rconn); + pfree(rconn); + + if (conname) deleteConnection(conname); - pfree(rconn); - } else - pconn->conn = NULL; + pconn = NULL; PG_RETURN_TEXT_P(cstring_to_text("OK")); } @@ -363,7 +397,6 @@ dblink_open(PG_FUNCTION_ARGS) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ - dblink_init(); initStringInfo(&buf); if (PG_NARGS() == 2) @@ -401,10 +434,12 @@ dblink_open(PG_FUNCTION_ARGS) rconn = getConnectionByName(conname); } - if (!rconn || !rconn->conn) + if (!rconn) dblink_conn_not_avail(conname); - else - conn = rconn->conn; + + dblink_bgsession_not_supported(rconn); + + conn = rconn->conn; /* If we are not in a transaction, start one */ if (PQtransactionStatus(conn) == PQTRANS_IDLE) @@ -454,7 +489,6 @@ dblink_close(PG_FUNCTION_ARGS) remoteConn *rconn = NULL; bool fail = true; /* default to backward compatible behavior */ - dblink_init(); initStringInfo(&buf); if (PG_NARGS() == 1) @@ -488,10 +522,12 @@ dblink_close(PG_FUNCTION_ARGS) rconn = getConnectionByName(conname); } - if (!rconn || !rconn->conn) + if (!rconn) dblink_conn_not_avail(conname); - else - conn = rconn->conn; + + dblink_bgsession_not_supported(rconn); + + conn = rconn->conn; appendStringInfo(&buf, "CLOSE %s", curname); @@ -543,8 +579,6 @@ dblink_fetch(PG_FUNCTION_ARGS) prepTuplestoreResult(fcinfo); - dblink_init(); - if (PG_NARGS() == 4) { /* text,text,int,bool */ @@ -554,8 +588,6 @@ dblink_fetch(PG_FUNCTION_ARGS) fail = PG_GETARG_BOOL(3); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; } else if (PG_NARGS() == 3) { @@ -565,7 +597,7 @@ dblink_fetch(PG_FUNCTION_ARGS) curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); howmany = PG_GETARG_INT32(1); fail = PG_GETARG_BOOL(2); - conn = pconn->conn; + rconn = pconn; } else { @@ -574,8 +606,6 @@ dblink_fetch(PG_FUNCTION_ARGS) howmany = PG_GETARG_INT32(2); rconn = getConnectionByName(conname); - if (rconn) - conn = rconn->conn; } } else if (PG_NARGS() == 2) @@ -583,12 +613,16 @@ dblink_fetch(PG_FUNCTION_ARGS) /* text,int */ curname = text_to_cstring(PG_GETARG_TEXT_PP(0)); howmany = PG_GETARG_INT32(1); - conn = pconn->conn; + rconn = pconn; } - if (!conn) + if (!rconn) dblink_conn_not_avail(conname); + dblink_bgsession_not_supported(rconn); + + conn = rconn->conn; + initStringInfo(&buf); appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname); @@ -633,23 +667,30 @@ PG_FUNCTION_INFO_V1(dblink_send_query); Datum dblink_send_query(PG_FUNCTION_ARGS) { - PGconn *conn; + remoteConn *rconn; char *sql; int retval; if (PG_NARGS() == 2) { - conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); + rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } else /* shouldn't happen */ elog(ERROR, "wrong number of arguments"); - /* async query send */ - retval = PQsendQuery(conn, sql); - if (retval != 1) - elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn))); + if (rconn->conn) + { + retval = PQsendQuery(rconn->conn, sql); + if (retval != 1) + elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(rconn->conn))); + } + else + { + BackgroundSessionSend(rconn->bgconn, sql); + retval = 1; + } PG_RETURN_INT32(retval); } @@ -664,13 +705,11 @@ dblink_get_result(PG_FUNCTION_ARGS) static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) { - PGconn *volatile conn = NULL; + remoteConn *volatile rconn = NULL; volatile bool freeconn = false; prepTuplestoreResult(fcinfo); - dblink_init(); - PG_TRY(); { char *sql = NULL; @@ -682,7 +721,7 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) if (PG_NARGS() == 3) { /* text,text,bool */ - dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn); + dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); fail = PG_GETARG_BOOL(2); } @@ -691,20 +730,20 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) /* text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = pconn->conn; + rconn = pconn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); } else { - dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn); + dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } } else if (PG_NARGS() == 1) { /* text */ - conn = pconn->conn; + rconn = pconn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); } else @@ -717,61 +756,75 @@ dblink_record_internal(FunctionCallInfo fcinfo, bool is_async) if (PG_NARGS() == 2) { /* text,bool */ - conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); + rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); fail = PG_GETARG_BOOL(1); } else if (PG_NARGS() == 1) { /* text */ - conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); + rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); } else /* shouldn't happen */ elog(ERROR, "wrong number of arguments"); } - if (!conn) + if (!rconn) dblink_conn_not_avail(conname); if (!is_async) { - /* synchronous query, use efficient tuple collection method */ - materializeQueryResult(fcinfo, conn, conname, sql, fail); + if (rconn->conn) + /* synchronous query, use efficient tuple collection method */ + materializeQueryResult(fcinfo, rconn->conn, conname, sql, fail); + else if (rconn->bgconn) + { + BackgroundSessionResult *res = BackgroundSessionExecute(rconn->bgconn, sql); + materializeBgResult(fcinfo, rconn->bgconn, res); + } } else { /* async result retrieval, do it the old way */ - PGresult *res = PQgetResult(conn); - - /* NULL means we're all done with the async results */ - if (res) + if (rconn->conn) { - if (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK) - { - dblink_res_error(conn, conname, res, - "could not execute query", fail); - /* if fail isn't set, we'll return an empty query result */ - } - else + PGresult *res = PQgetResult(rconn->conn); + + /* NULL means we're all done with the async results */ + if (res) { - materializeResult(fcinfo, conn, res); + if (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK) + { + dblink_res_error(rconn->conn, conname, res, + "could not execute query", fail); + /* if fail isn't set, we'll return an empty query result */ + } + else + { + materializeResult(fcinfo, rconn->conn, res); + } } } + else + { + BackgroundSessionResult *res = BackgroundSessionGetResult(rconn->bgconn); + materializeBgResult(fcinfo, rconn->bgconn, res); + } } } PG_CATCH(); { /* if needed, close the connection to the database */ if (freeconn) - PQfinish(conn); + dblink_finish_conn(rconn); PG_RE_THROW(); } PG_END_TRY(); /* if needed, close the connection to the database */ if (freeconn) - PQfinish(conn); + dblink_finish_conn(rconn); return (Datum) 0; } @@ -806,6 +859,45 @@ prepTuplestoreResult(FunctionCallInfo fcinfo) } /* + * get a tuple descriptor for our result type + */ +static TupleDesc +dblink_get_call_result_type(FunctionCallInfo fcinfo, int nfields) +{ + TupleDesc tupdesc; + + switch (get_call_result_type(fcinfo, NULL, &tupdesc)) + { + case TYPEFUNC_COMPOSITE: + /* success */ + break; + case TYPEFUNC_RECORD: + /* failed to determine actual type of RECORD */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("function returning record called in context " + "that cannot accept type record"))); + break; + default: + /* result type isn't composite */ + elog(ERROR, "return type must be a row type"); + break; + } + + /* + * check result and tuple descriptor have the same number of columns + */ + if (nfields != tupdesc->natts) + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("remote query result rowtype does not match " + "the specified FROM clause rowtype"))); + + /* make sure we have a persistent copy of the tupdesc */ + return CreateTupleDescCopy(tupdesc); +} + +/* * Copy the contents of the PGresult into a tuplestore to be returned * as the result of the current function. * The PGresult will be released in this function. @@ -844,41 +936,11 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res) Assert(PQresultStatus(res) == PGRES_TUPLES_OK); is_sql_cmd = false; - - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } - - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); ntuples = PQntuples(res); nfields = PQnfields(res); + tupdesc = dblink_get_call_result_type(fcinfo, nfields); } - /* - * check result and tuple descriptor have the same number of columns - */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); - if (ntuples > 0) { AttInMetadata *attinmeta; @@ -948,6 +1010,95 @@ materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res) PG_END_TRY(); } +static void +materializeBgResult(FunctionCallInfo fcinfo, BackgroundSession *bgconn, BackgroundSessionResult *res) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + bool is_sql_cmd; + int nfields; + AttInMetadata *attinmeta; + HeapTuple outtuple; + Tuplestorestate *tupstore; + MemoryContext oldcontext; + char **values; + + /* prepTuplestoreResult must have been called previously */ + Assert(rsinfo->returnMode == SFRM_Materialize); + + if (!res->tupdesc) + { + is_sql_cmd = true; + + /* + * need a tuple descriptor representing one TEXT column to return + * the command status string as our result tuple + */ + tupdesc = CreateTemplateTupleDesc(1, false); + TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status", + TEXTOID, -1, 0); + nfields = 1; + } + else + { + is_sql_cmd = false; + nfields = res->tupdesc->natts; + tupdesc = dblink_get_call_result_type(fcinfo, nfields); + } + + attinmeta = TupleDescGetAttInMetadata(tupdesc); + + oldcontext = MemoryContextSwitchTo( + rsinfo->econtext->ecxt_per_query_memory); + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + values = (char **) palloc(nfields * sizeof(char *)); + + if (is_sql_cmd) + { + values[0] = (char *) res->command; + outtuple = BuildTupleFromCStrings(attinmeta, values); + tuplestore_puttuple(tupstore, outtuple); + } + else + { + int j; + + for (j = 0; j < res->ntuples; j++) + { + HeapTuple intuple = res->tuples[j]; + int i; + + for (i = 0; i < nfields; i++) + { + bool isnull; + Datum val; + + val = heap_getattr(intuple, i+1, res->tupdesc, &isnull); + if (isnull) + values[i] = NULL; + else + { + Oid typOutput; + bool typIsVarlena; + + getTypeOutputInfo(res->tupdesc->attrs[i]->atttypid, &typOutput, &typIsVarlena); + values[i] = DatumGetCString(OidFunctionCall1(typOutput, val)); + } + } + + outtuple = BuildTupleFromCStrings(attinmeta, values); + tuplestore_puttuple(tupstore, outtuple); + } + } + + /* clean up and return the tuplestore */ + tuplestore_donestoring(tupstore); +} + /* * Execute the given SQL command and store its results into a tuplestore * to be returned as the result of the current function. @@ -1165,34 +1316,7 @@ storeRow(volatile storeInfo *sinfo, PGresult *res, bool first) tuplestore_end(sinfo->tuplestore); sinfo->tuplestore = NULL; - /* get a tuple descriptor for our result type */ - switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc)) - { - case TYPEFUNC_COMPOSITE: - /* success */ - break; - case TYPEFUNC_RECORD: - /* failed to determine actual type of RECORD */ - ereport(ERROR, - (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("function returning record called in context " - "that cannot accept type record"))); - break; - default: - /* result type isn't composite */ - elog(ERROR, "return type must be a row type"); - break; - } - - /* make sure we have a persistent copy of the tupdesc */ - tupdesc = CreateTupleDescCopy(tupdesc); - - /* check result and tuple descriptor have the same number of columns */ - if (nfields != tupdesc->natts) - ereport(ERROR, - (errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("remote query result rowtype does not match " - "the specified FROM clause rowtype"))); + tupdesc = dblink_get_call_result_type(sinfo->fcinfo, nfields); /* Prepare attinmeta for later data conversions */ sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc); @@ -1292,13 +1416,14 @@ PG_FUNCTION_INFO_V1(dblink_is_busy); Datum dblink_is_busy(PG_FUNCTION_ARGS) { - PGconn *conn; + remoteConn *rconn; - dblink_init(); - conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); + rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); - PQconsumeInput(conn); - PG_RETURN_INT32(PQisBusy(conn)); + dblink_bgsession_not_supported(rconn); + + PQconsumeInput(rconn->conn); + PG_RETURN_INT32(PQisBusy(rconn->conn)); } /* @@ -1317,13 +1442,13 @@ Datum dblink_cancel_query(PG_FUNCTION_ARGS) { int res; - PGconn *conn; + remoteConn *rconn; PGcancel *cancel; char errbuf[256]; - dblink_init(); - conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); - cancel = PQgetCancel(conn); + rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); + dblink_bgsession_not_supported(rconn); + cancel = PQgetCancel(rconn->conn); res = PQcancel(cancel, errbuf, 256); PQfreeCancel(cancel); @@ -1350,12 +1475,12 @@ Datum dblink_error_message(PG_FUNCTION_ARGS) { char *msg; - PGconn *conn; + remoteConn *rconn; - dblink_init(); - conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); + rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); + dblink_bgsession_not_supported(rconn); - msg = PQerrorMessage(conn); + msg = PQerrorMessage(rconn->conn); if (msg == NULL || msg[0] == '\0') PG_RETURN_TEXT_P(cstring_to_text("OK")); else @@ -1370,14 +1495,11 @@ Datum dblink_exec(PG_FUNCTION_ARGS) { text *volatile sql_cmd_status = NULL; - PGconn *volatile conn = NULL; + remoteConn *volatile rconn = NULL; volatile bool freeconn = false; - dblink_init(); - PG_TRY(); { - PGresult *res = NULL; char *sql = NULL; char *conname = NULL; bool fail = true; /* default to backward compatible behavior */ @@ -1385,7 +1507,7 @@ dblink_exec(PG_FUNCTION_ARGS) if (PG_NARGS() == 3) { /* must be text,text,bool */ - dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn); + dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); fail = PG_GETARG_BOOL(2); } @@ -1394,72 +1516,89 @@ dblink_exec(PG_FUNCTION_ARGS) /* might be text,text or text,bool */ if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID) { - conn = pconn->conn; + rconn = pconn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); fail = PG_GETARG_BOOL(1); } else { - dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &conn, &conname, &freeconn); + dblink_get_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)), &rconn, &conname, &freeconn); sql = text_to_cstring(PG_GETARG_TEXT_PP(1)); } } else if (PG_NARGS() == 1) { /* must be single text argument */ - conn = pconn->conn; + rconn = pconn; sql = text_to_cstring(PG_GETARG_TEXT_PP(0)); } else /* shouldn't happen */ elog(ERROR, "wrong number of arguments"); - if (!conn) + if (!rconn) dblink_conn_not_avail(conname); - res = PQexec(conn, sql); - if (!res || - (PQresultStatus(res) != PGRES_COMMAND_OK && - PQresultStatus(res) != PGRES_TUPLES_OK)) + if (rconn->conn) { - dblink_res_error(conn, conname, res, - "could not execute command", fail); + PGresult *res; - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = cstring_to_text("ERROR"); - } - else if (PQresultStatus(res) == PGRES_COMMAND_OK) - { - /* - * and save a copy of the command status string to return as our - * result tuple - */ - sql_cmd_status = cstring_to_text(PQcmdStatus(res)); - PQclear(res); + res = PQexec(rconn->conn, sql); + + if (!res || + (PQresultStatus(res) != PGRES_COMMAND_OK && + PQresultStatus(res) != PGRES_TUPLES_OK)) + { + dblink_res_error(rconn->conn, conname, res, + "could not execute command", fail); + + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = cstring_to_text("ERROR"); + } + else if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + /* + * and save a copy of the command status string to return as our + * result tuple + */ + sql_cmd_status = cstring_to_text(PQcmdStatus(res)); + PQclear(res); + } + else + { + PQclear(res); + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("statement returning results not allowed"))); + } } else { - PQclear(res); - ereport(ERROR, - (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), - errmsg("statement returning results not allowed"))); + BackgroundSessionResult *res; + + res = BackgroundSessionExecute(rconn->bgconn, sql); + if (res->tupdesc) + ereport(ERROR, + (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), + errmsg("statement returning results not allowed"))); + sql_cmd_status = cstring_to_text(res->command); } } PG_CATCH(); { /* if needed, close the connection to the database */ if (freeconn) - PQfinish(conn); + dblink_finish_conn(rconn); PG_RE_THROW(); } PG_END_TRY(); /* if needed, close the connection to the database */ if (freeconn) - PQfinish(conn); + dblink_finish_conn(rconn); PG_RETURN_TEXT_P(sql_cmd_status); } @@ -1866,7 +2005,7 @@ PG_FUNCTION_INFO_V1(dblink_get_notify); Datum dblink_get_notify(PG_FUNCTION_ARGS) { - PGconn *conn; + remoteConn *rconn; PGnotify *notify; ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; @@ -1876,11 +2015,12 @@ dblink_get_notify(PG_FUNCTION_ARGS) prepTuplestoreResult(fcinfo); - dblink_init(); if (PG_NARGS() == 1) - conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); + rconn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0))); else - conn = pconn->conn; + rconn = pconn; + + dblink_bgsession_not_supported(rconn); /* create the tuplestore in per-query memory */ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; @@ -1900,8 +2040,8 @@ dblink_get_notify(PG_FUNCTION_ARGS) MemoryContextSwitchTo(oldcontext); - PQconsumeInput(conn); - while ((notify = PQnotifies(conn)) != NULL) + PQconsumeInput(rconn->conn); + while ((notify = PQnotifies(rconn->conn)) != NULL) { Datum values[DBLINK_NOTIFY_COLS]; bool nulls[DBLINK_NOTIFY_COLS]; @@ -1924,7 +2064,7 @@ dblink_get_notify(PG_FUNCTION_ARGS) tuplestore_putvalues(tupstore, tupdesc, values, nulls); PQfreemem(notify); - PQconsumeInput(conn); + PQconsumeInput(rconn->conn); } /* clean up and return the tuplestore */ @@ -2557,7 +2697,7 @@ createNewConnection(const char *name, remoteConn *rconn) if (found) { - PQfinish(rconn->conn); + dblink_finish_conn(rconn); pfree(rconn); ereport(ERROR, @@ -2592,15 +2732,14 @@ deleteConnection(const char *name) } static void -dblink_security_check(PGconn *conn, remoteConn *rconn) +dblink_security_check(remoteConn *rconn) { - if (!superuser()) + if (!superuser() && rconn->conn) { - if (!PQconnectionUsedPassword(conn)) + if (!PQconnectionUsedPassword(rconn->conn)) { - PQfinish(conn); - if (rconn) - pfree(rconn); + PQfinish(rconn->conn); + pfree(rconn); ereport(ERROR, (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED), diff --git a/contrib/dblink/dblink.control b/contrib/dblink/dblink.control index bdd17d28a4..816d19f448 100644 --- a/contrib/dblink/dblink.control +++ b/contrib/dblink/dblink.control @@ -1,5 +1,5 @@ # dblink extension comment = 'connect to other PostgreSQL databases from within a database' -default_version = '1.2' +default_version = '1.3' module_pathname = '$libdir/dblink' relocatable = true diff --git a/contrib/dblink/expected/dblink.out b/contrib/dblink/expected/dblink.out index 4b6d26e574..6faa525f0c 100644 --- a/contrib/dblink/expected/dblink.out +++ b/contrib/dblink/expected/dblink.out @@ -1124,3 +1124,98 @@ SELECT dblink_disconnect('myconn'); RESET datestyle; RESET intervalstyle; RESET timezone; +-- background sessions +SELECT * FROM foo; + f1 | f2 | f3 +----+----+--------------- + 0 | a | {a0,b0,c0} + 1 | b | {a1,b1,c1} + 2 | c | {a2,b2,c2} + 3 | d | {a3,b3,c3} + 4 | e | {a4,b4,c4} + 5 | f | {a5,b5,c5} + 6 | g | {a6,b6,c6} + 7 | h | {a7,b7,c7} + 8 | i | {a8,b8,c8} + 9 | j | {a9,b9,c9} + 10 | k | {a10,b10,c10} +(11 rows) + +SELECT dblink_connect_self(); + dblink_connect_self +--------------------- + OK +(1 row) + +SELECT dblink_exec('SELECT * FROM foo'); +ERROR: statement returning results not allowed +SELECT dblink_exec('DELETE FROM foo WHERE f1 > 5'); + dblink_exec +------------- + DELETE 5 +(1 row) + +SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]); + a | b | c +---+---+------------ + 0 | a | {a0,b0,c0} + 1 | b | {a1,b1,c1} + 2 | c | {a2,b2,c2} + 3 | d | {a3,b3,c3} + 4 | e | {a4,b4,c4} + 5 | f | {a5,b5,c5} +(6 rows) + +SELECT * FROM dblink('DELETE FROM foo WHERE f1 > 5') AS (status text); + status +---------- + DELETE 0 +(1 row) + +SELECT dblink_disconnect(); + dblink_disconnect +------------------- + OK +(1 row) + +SELECT * FROM foo; + f1 | f2 | f3 +----+----+------------ + 0 | a | {a0,b0,c0} + 1 | b | {a1,b1,c1} + 2 | c | {a2,b2,c2} + 3 | d | {a3,b3,c3} + 4 | e | {a4,b4,c4} + 5 | f | {a5,b5,c5} +(6 rows) + +-- asynchronous case +SELECT dblink_connect_self('myconn'); + dblink_connect_self +--------------------- + OK +(1 row) + +SELECT * +FROM dblink_send_query('myconn', + 'SELECT * FROM + (VALUES (''12.03.2013 00:00:00+00''), + (''12.03.2013 00:00:00+00'')) t'); + dblink_send_query +------------------- + 1 +(1 row) + +SELECT * from dblink_get_result('myconn') as t(t timestamptz); + t +------------------------------ + Mon Dec 02 16:00:00 2013 PST + Mon Dec 02 16:00:00 2013 PST +(2 rows) + +SELECT dblink_disconnect('myconn'); + dblink_disconnect +------------------- + OK +(1 row) + diff --git a/contrib/dblink/sql/dblink.sql b/contrib/dblink/sql/dblink.sql index 681cf6a6e8..7f66a98aa5 100644 --- a/contrib/dblink/sql/dblink.sql +++ b/contrib/dblink/sql/dblink.sql @@ -563,3 +563,23 @@ CREATE TEMPORARY TABLE result (t timestamptz); RESET datestyle; RESET intervalstyle; RESET timezone; + +-- background sessions +SELECT * FROM foo; +SELECT dblink_connect_self(); +SELECT dblink_exec('SELECT * FROM foo'); +SELECT dblink_exec('DELETE FROM foo WHERE f1 > 5'); +SELECT * FROM dblink('SELECT * FROM foo') AS t(a int, b text, c text[]); +SELECT * FROM dblink('DELETE FROM foo WHERE f1 > 5') AS (status text); +SELECT dblink_disconnect(); +SELECT * FROM foo; + +-- asynchronous case +SELECT dblink_connect_self('myconn'); +SELECT * +FROM dblink_send_query('myconn', + 'SELECT * FROM + (VALUES (''12.03.2013 00:00:00+00''), + (''12.03.2013 00:00:00+00'')) t'); +SELECT * from dblink_get_result('myconn') as t(t timestamptz); +SELECT dblink_disconnect('myconn'); diff --git a/doc/src/sgml/dblink.sgml b/doc/src/sgml/dblink.sgml index f19c6b19f5..4b070ddb23 100644 --- a/doc/src/sgml/dblink.sgml +++ b/doc/src/sgml/dblink.sgml @@ -239,6 +239,43 @@ <title>Description</title> </refsect1> </refentry> + <refentry id="CONTRIB-DBLINK-CONNECT-SELF"> + <indexterm> + <primary>dblink_connect_self</primary> + </indexterm> + + <refmeta> + <refentrytitle>dblink_connect_self</refentrytitle> + <manvolnum>3</manvolnum> + </refmeta> + + <refnamediv> + <refname>dblink_connect_self</refname> + <refpurpose>opens a persistent connection to the same database</refpurpose> + </refnamediv> + + <refsynopsisdiv> +<synopsis> +dblink_connect_self() returns text +dblink_connect_self(text connname) returns text +</synopsis> + </refsynopsisdiv> + + <refsect1> + <title>Description</title> + + <para> + <function>dblink_connect_self()</> opens a connection to the current + database and as the current user, using the background session API + (<xref linkend="bgsession">). + </para> + + <para> + For further details see <function>dblink_connect()</>. + </para> + </refsect1> + </refentry> + <refentry id="CONTRIB-DBLINK-DISCONNECT"> <indexterm> <primary>dblink_disconnect</primary> -- 2.12.0
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers