Here is a patch for the background sessions C API and PL/Python support. This was previously submitted as "autonomous transactions", which proved controversial, and there have been several suggestions for a new name.
I have renamed everything, removed all the incomplete PL/pgSQL stuff, did some refinement on the PL/Python interfaces, added resource owner management so that you can preserve session handles across transactions. That allows a pg_background-like behavior implemented in a PL function. I have also added documentation, so reviewers could start there. Probably not quite all done yet, but I think it contains a lot of useful pieces that we could make into something nice. -- Peter Eisentraut http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 661c7fe769982e3f0c71f4ad57a768d7eb55f6e2 Mon Sep 17 00:00:00 2001 From: Peter Eisentraut <pete...@gmx.net> Date: Mon, 31 Oct 2016 12:00:00 -0400 Subject: [PATCH] Add background sessions This adds a C API to run SQL statements in a background worker, communicating by FE/BE protocol over a DSM message queue. This can be used to execute statements and transactions separate from the main foreground session. Also included is a PL/Python interface to this functionality. --- doc/src/sgml/bgsession.sgml | 236 +++++++ doc/src/sgml/filelist.sgml | 1 + doc/src/sgml/plpython.sgml | 102 +++ doc/src/sgml/postgres.sgml | 1 + src/backend/commands/variable.c | 5 + src/backend/libpq/pqmq.c | 23 + src/backend/tcop/Makefile | 2 +- src/backend/tcop/bgsession.c | 890 ++++++++++++++++++++++++ src/backend/tcop/postgres.c | 24 +- src/include/commands/variable.h | 1 + src/include/libpq/pqmq.h | 1 + src/include/tcop/bgsession.h | 26 + src/include/tcop/tcopprot.h | 9 + src/pl/plpython/Makefile | 2 + src/pl/plpython/expected/plpython_bgsession.out | 188 +++++ src/pl/plpython/expected/plpython_test.out | 7 +- src/pl/plpython/plpy_bgsession.c | 454 ++++++++++++ src/pl/plpython/plpy_bgsession.h | 18 + src/pl/plpython/plpy_main.h | 3 + src/pl/plpython/plpy_planobject.c | 1 + src/pl/plpython/plpy_planobject.h | 2 + src/pl/plpython/plpy_plpymodule.c | 5 + src/pl/plpython/plpy_spi.c | 7 +- src/pl/plpython/plpy_spi.h | 3 + src/pl/plpython/sql/plpython_bgsession.sql | 148 ++++ 25 files changed, 2139 insertions(+), 20 deletions(-) create mode 100644 doc/src/sgml/bgsession.sgml create mode 100644 src/backend/tcop/bgsession.c create mode 100644 src/include/tcop/bgsession.h create mode 100644 src/pl/plpython/expected/plpython_bgsession.out create mode 100644 src/pl/plpython/plpy_bgsession.c create mode 100644 src/pl/plpython/plpy_bgsession.h create mode 100644 src/pl/plpython/sql/plpython_bgsession.sql diff --git a/doc/src/sgml/bgsession.sgml b/doc/src/sgml/bgsession.sgml new file mode 100644 index 0000000..785ee66 --- /dev/null +++ b/doc/src/sgml/bgsession.sgml @@ -0,0 +1,236 @@ +<!-- doc/src/sgml/bgsession.sgml --> + +<chapter id="bgsession"> + <title>Background Session API</title> + + <para> + The background session API is a C API for creating additional database + sessions in the background and running SQL statements in them. A background + session behaves like a normal (foreground) session in that it has session + state, transactions, can run SQL statements, and so on. Unlike a foreground + session, it is not connected directly to a client. Instead the foreground + session can use this API to execute SQL statements and retrieve their + results. Higher-level integrations, such as in procedural languages, can + make this functionality available to clients. Background sessions are + independent from their foreground sessions in their session and transaction + state. So a background session cannot see uncommitted data in foreground + sessions or vice versa, and there is no preferential treatment about + locking. Like all sessions, background sessions are separate processes. + Foreground and background sessions communicate over shared memory messages + queues instead of the sockets that a client/server connection uses. + </para> + + <para> + Background sessions can be useful in a variety of scenarios when effects + that are independent of the foreground session are to be achieved, for + example: + <itemizedlist> + <listitem> + <para> + Commit data independent of whether a foreground transaction commits, for + example for auditing. A trigger in the foreground session could effect + the necessary writes via a background session. + </para> + </listitem> + <listitem> + <para> + Large changes can be split up into smaller transactions. A foreground + session can orchestrate the logic, for example in a function, while the + actual writes and commits are executed in a background session. + </para> + </listitem> + </itemizedlist> + </para> + + <sect1 id="bgsession-types"> + <title>API</title> + + <para> + Use <literal>#include "tcop/bgsession.h"</literal> to include the API + declarations. + </para> + + <sect2> + <title>Types</title> + + <variablelist> + <varlistentry> + <term><type>BackgroundSession</type></term> + <listitem> + <para> + An opaque connection handle. Multiple background sessions can exist at + the same time, and each is prepresented by an instance of this type. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><type>BackgroundSessionPreparedStatement</type></term> + <listitem> + <para> + An opaque handle for a prepared statement, created when the statement + is prepared and used when the statement is executed. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><type>BackgroundSessionResult</type></term> + <listitem> + <para> + A handle for a query result, defined as: +<programlisting> +typedef struct BackgroundSessionResult +{ + TupleDesc tupdesc; + List *tuples; + const char *command; +} BackgroundSessionResult; +</programlisting> + <structfield>tupdesc</structfield> describes the result + rows, <symbol>NULL</symbol> if the command does not return + rows. <structfield>tuples</structfield> is a list + of <type>HeapTuple</type>s with the result + rows. <structfield>command</structfield> is the tag of the executed + command. + </para> + </listitem> + </varlistentry> + </variablelist> + </sect2> + + <sect2> + <title>Functions</title> + + <variablelist> + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>BackgroundSession *<function>BackgroundSessionStart</function></funcdef> + <paramdef>void</paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Creates a background session. This starts the background worker and + establishes a connection to it. + </para> + + <para> + A background session does not automatically end when the foreground + transaction or session ends. + Use <function>BackgroundSessionEnd()</function> to end a background + session. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>void <function>BackgroundSessionEnd</function></funcdef> + <paramdef>BackgroundSession *<parameter>session</parameter></paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Ends a background session. This closes the connection the background + worker. + </para> + + <para> + It is an error to close a background session with a transaction block + open. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecute</function></funcdef> + <paramdef>BackgroundSession *<parameter>session</parameter></paramdef> + <paramdef>const char *<parameter>sql</parameter></paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Execute an SQL statement and return the result. Access the fields of + the result structure to get details about the command result. If there + is an error, this function does not return. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>BackgroundSessionPreparedStatement *<function>BackgroundSessionPrepare</function></funcdef> + <paramdef>BackgroundSession *<parameter>session</parameter></paramdef> + <paramdef>const char *<parameter>sql</parameter></paramdef> + <paramdef>int <parameter>nargs</parameter></paramdef> + <paramdef>Oid <parameter>argtypes</parameter>[]</paramdef> + <paramdef>const char *<parameter>argnames</parameter></paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Prepare an SQL statement for later execution + by <function>BackgroundSessionExecutePrepared()</function>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <funcsynopsis> + <funcprototype> + <funcdef>BackgroundSessionResult *<function>BackgroundSessionExecutePrepared</function></funcdef> + <paramdef>BackgroundSessionPreparedStatement *<parameter>stmt</parameter></paramdef> + <paramdef>int <parameter>nargs</parameter></paramdef> + <paramdef>Datum <parameter>values</parameter>[]</paramdef> + <paramdef>bool <parameter>nulls</parameter>[]</paramdef> + </funcprototype> + </funcsynopsis> + </term> + <listitem> + <para> + Execute a statement previously prepared by + <function>BackgroundSessionPrepare()</function>. + </para> + </listitem> + </varlistentry> + </variablelist> + + <para> + Here is a very simple example: + +<programlisting> +#include "tcop/bgsession.h" + +void +myfunc() +{ + BackgroundSession *session; + BackgroundSessionResult *result; + + session = BackgroundSessionStart(); + + result = BackgroundSessionExecute(session, "SELECT ..."); + elog(INFO, "returned %d rows", list_length(result->tuples)); + + BackgroundSessionEnd(session); +} +</programlisting> + </para> + </sect2> + </sect1> +</chapter> diff --git a/doc/src/sgml/filelist.sgml b/doc/src/sgml/filelist.sgml index 69649a7..7531c8f 100644 --- a/doc/src/sgml/filelist.sgml +++ b/doc/src/sgml/filelist.sgml @@ -52,6 +52,7 @@ <!ENTITY wal SYSTEM "wal.sgml"> <!-- programmer's guide --> +<!ENTITY bgsession SYSTEM "bgsession.sgml"> <!ENTITY bgworker SYSTEM "bgworker.sgml"> <!ENTITY dfunc SYSTEM "dfunc.sgml"> <!ENTITY ecpg SYSTEM "ecpg.sgml"> diff --git a/doc/src/sgml/plpython.sgml b/doc/src/sgml/plpython.sgml index 4639778..53a2e8f 100644 --- a/doc/src/sgml/plpython.sgml +++ b/doc/src/sgml/plpython.sgml @@ -1357,6 +1357,108 @@ <title>Older Python Versions</title> </sect2> </sect1> + <sect1 id="plpython-bgsession"> + <title>Background Sessions</title> + + <para> + PL/Python exposes the background session interface described + in <xref linkend="bgsession"> as a Python API. A background session is + represented by a Python object of type <type>plpy.BackgroundSession</type>. + It has these methods: + + <variablelist> + <varlistentry> + <term><literal>.close()</literal></term> + <listitem> + <para> + Close the session. This must be called or the session will leak. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.execute(<parameter>sql</parameter>)</literal></term> + <listitem> + <para> + Execute an SQL statement. The return value is a result object of the + same type as returned by <literal>plpy.execute</literal>. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.prepare(<parameter>sql</parameter>, <replaceable>types</replaceable>)</literal></term> + <listitem> + <para> + Prepare an SQL statement. The parameter data types are specified as + for <literal>plpy.prepare</literal>. The return value is a plan + object. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.execute_prepared(<parameter>plan</parameter>, <replaceable>values</replaceable>)</literal></term> + <listitem> + <para> + Execute a prepared statement. The parameter values are specified as + for <literal>plpy.execute</literal>. The return value is a result + object. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.__enter__()</literal></term> + <listitem> + <para> + This just returns self. It is supplied so that + a <classname>BackgroundSession</classname> object can be used as a + context manager. See example below. + </para> + </listitem> + </varlistentry> + + <varlistentry> + <term><literal>.__exit__(<parameter>exc_type</parameter>, <parameter>exc</parameter>, <parameter>tb</parameter>)</literal></term> + <listitem> + <para> + This is the same as <literal>.close()</literal>. It is supplied so + that a <classname>BackgroundSession</classname> object can be used as a + context manager. (The arguments are ignored.) + </para> + </listitem> + </varlistentry> + </variablelist> + </para> + + <para> + A <classname>BackgroundSession</classname> object can be preserved across + function calls or passed between functions via the <varname>SD</varname> + and <varname>GD</varname> variables. + </para> + + <para> + Example: +<programlisting> +CREATE FUNCTION bulkload() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + for i in range(1, 1001): + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 100 == 0: + a.execute("COMMIT") + +return 0 +$$; +</programlisting> + This function inserts 1000 values into the table and commits after every + 100 values. + </para> + </sect1> + <sect1 id="plpython-util"> <title>Utility Functions</title> <para> diff --git a/doc/src/sgml/postgres.sgml b/doc/src/sgml/postgres.sgml index 9143917..e281002 100644 --- a/doc/src/sgml/postgres.sgml +++ b/doc/src/sgml/postgres.sgml @@ -220,6 +220,7 @@ <title>Server Programming</title> &spi; &bgworker; + &bgsession; &logicaldecoding; &replication-origins; diff --git a/src/backend/commands/variable.c b/src/backend/commands/variable.c index defafa5..a522c69 100644 --- a/src/backend/commands/variable.c +++ b/src/backend/commands/variable.c @@ -674,12 +674,17 @@ show_random_seed(void) * SET CLIENT_ENCODING */ +void (*check_client_encoding_hook)(void); + bool check_client_encoding(char **newval, void **extra, GucSource source) { int encoding; const char *canonical_name; + if (check_client_encoding_hook) + check_client_encoding_hook(); + /* Look up the encoding by name */ encoding = pg_valid_client_encoding(*newval); if (encoding < 0) diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index f93ccae..160f05b 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -48,6 +48,11 @@ static PQcommMethods PqCommMqMethods = { mq_endcopyout }; +static PQcommMethods *save_PqCommMethods; +static CommandDest save_whereToSendOutput; +static ProtocolVersion save_FrontendProtocol; +static dsm_segment *save_seg; + /* * Arrange to redirect frontend/backend protocol messages to a shared-memory * message queue. @@ -55,12 +60,30 @@ static PQcommMethods PqCommMqMethods = { void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh) { + save_PqCommMethods = PqCommMethods; + save_whereToSendOutput = whereToSendOutput; + save_FrontendProtocol = FrontendProtocol; + PqCommMethods = &PqCommMqMethods; pq_mq = shm_mq_get_queue(mqh); pq_mq_handle = mqh; whereToSendOutput = DestRemote; FrontendProtocol = PG_PROTOCOL_LATEST; on_dsm_detach(seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0); + + save_seg = seg; +} + +void +pq_stop_redirect_to_shm_mq(void) +{ + cancel_on_dsm_detach(save_seg, pq_cleanup_redirect_to_shm_mq, (Datum) 0); + PqCommMethods = save_PqCommMethods; + whereToSendOutput = save_whereToSendOutput; + FrontendProtocol = save_FrontendProtocol; + pq_mq = NULL; + pq_mq_handle = NULL; + save_seg = NULL; } /* diff --git a/src/backend/tcop/Makefile b/src/backend/tcop/Makefile index 674302f..ab26767 100644 --- a/src/backend/tcop/Makefile +++ b/src/backend/tcop/Makefile @@ -12,7 +12,7 @@ subdir = src/backend/tcop top_builddir = ../../.. include $(top_builddir)/src/Makefile.global -OBJS= dest.o fastpath.o postgres.o pquery.o utility.o +OBJS= bgsession.o dest.o fastpath.o postgres.o pquery.o utility.o ifneq (,$(filter $(PORTNAME),cygwin win32)) override CPPFLAGS += -DWIN32_STACK_RLIMIT=$(WIN32_STACK_RLIMIT) diff --git a/src/backend/tcop/bgsession.c b/src/backend/tcop/bgsession.c new file mode 100644 index 0000000..2cc6438 --- /dev/null +++ b/src/backend/tcop/bgsession.c @@ -0,0 +1,890 @@ +/*-------------------------------------------------------------------------- + * + * bgsession.c + * Run SQL commands using a background worker. + * + * Copyright (C) 2016, PostgreSQL Global Development Group + * + * IDENTIFICATION + * src/backend/tcop/bgsession.c + * + * + * This implements a C API to open a background session and run SQL queries + * in it. The session looks much like a normal database connection, but it is + * always to the same database, and there is no authentication needed. The + * "backend" for that connection is a background worker. The normal backend + * and the background session worker communicate over the normal FE/BE + * protocol. + * + * Types: + * + * BackgroundSession -- opaque connection handle + * BackgroundSessionPreparedStatement -- opaque prepared statement handle + * BackgroundSessionResult -- query result + * + * Functions: + * + * BackgroundSessionStart() -- start a session (launches background worker) + * and return a handle + * + * BackgroundSessionEnd() -- close session and free resources + * + * BackgroundSessionExecute() -- run SQL string and return result (rows or + * status) + * + * BackgroundSessionPrepare() -- prepare an SQL string for subsequent + * execution + * + * BackgroundSessionExecutePrepared() -- run prepared statement + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/tupdesc.h" +#include "access/xact.h" +#include "commands/async.h" +#include "commands/variable.h" +#include "lib/stringinfo.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "nodes/pg_list.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "storage/dsm.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "tcop/bgsession.h" +#include "tcop/tcopprot.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +/* Table-of-contents constants for our dynamic shared memory segment. */ +#define BGSESSION_MAGIC 0x50674267 + +#define BGSESSION_KEY_FIXED_DATA 0 +#define BGSESSION_KEY_GUC 1 +#define BGSESSION_KEY_COMMAND_QUEUE 2 +#define BGSESSION_KEY_RESPONSE_QUEUE 3 +#define BGSESSION_NKEYS 4 + +#define BGSESSION_QUEUE_SIZE 16384 + +/* Fixed-size data passed via our dynamic shared memory segment. */ +typedef struct background_session_fixed_data +{ + Oid database_id; + Oid authenticated_user_id; + Oid current_user_id; + int sec_context; +} background_session_fixed_data; + +struct BackgroundSession +{ + ResourceOwner resowner; + dsm_segment *seg; + BackgroundWorkerHandle *worker_handle; + shm_mq_handle *command_qh; + shm_mq_handle *response_qh; + int transaction_status; +}; + +struct BackgroundSessionPreparedStatement +{ + BackgroundSession *session; + Oid *argtypes; + TupleDesc tupdesc; +}; + +static void bgsession_worker_main(Datum main_arg); +static void shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg); +static void bgsession_check_client_encoding_hook(void); +static TupleDesc TupleDesc_from_RowDescription(StringInfo msg); +static HeapTuple HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg); +static void forward_NotifyResponse(StringInfo msg); +static void rethrow_errornotice(StringInfo msg); +static void invalid_protocol_message(char msgtype) pg_attribute_noreturn(); + + +BackgroundSession * +BackgroundSessionStart(void) +{ + ResourceOwner oldowner; + BackgroundWorker worker; + pid_t pid; + BackgroundSession *session; + shm_toc_estimator e; + Size segsize; + Size guc_len; + char *gucstate; + dsm_segment *seg; + shm_toc *toc; + background_session_fixed_data *fdata; + shm_mq *command_mq; + shm_mq *response_mq; + BgwHandleStatus bgwstatus; + StringInfoData msg; + char msgtype; + + session = palloc(sizeof(*session)); + + session->resowner = ResourceOwnerCreate(NULL, "background session"); + + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(background_session_fixed_data)); + shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE); + shm_toc_estimate_chunk(&e, BGSESSION_QUEUE_SIZE); + guc_len = EstimateGUCStateSpace(); + shm_toc_estimate_chunk(&e, guc_len); + shm_toc_estimate_keys(&e, BGSESSION_NKEYS); + segsize = shm_toc_estimate(&e); + oldowner = CurrentResourceOwner; + PG_TRY(); + { + CurrentResourceOwner = session->resowner; + seg = dsm_create(segsize, 0); + } + PG_CATCH(); + { + CurrentResourceOwner = oldowner; + PG_RE_THROW(); + } + PG_END_TRY(); + CurrentResourceOwner = oldowner; + + session->seg = seg; + + toc = shm_toc_create(BGSESSION_MAGIC, dsm_segment_address(seg), segsize); + + /* Store fixed-size data in dynamic shared memory. */ + fdata = shm_toc_allocate(toc, sizeof(*fdata)); + fdata->database_id = MyDatabaseId; + fdata->authenticated_user_id = GetAuthenticatedUserId(); + GetUserIdAndSecContext(&fdata->current_user_id, &fdata->sec_context); + shm_toc_insert(toc, BGSESSION_KEY_FIXED_DATA, fdata); + + /* Store GUC state in dynamic shared memory. */ + gucstate = shm_toc_allocate(toc, guc_len); + SerializeGUCState(guc_len, gucstate); + shm_toc_insert(toc, BGSESSION_KEY_GUC, gucstate); + + command_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE), + BGSESSION_QUEUE_SIZE); + shm_toc_insert(toc, BGSESSION_KEY_COMMAND_QUEUE, command_mq); + shm_mq_set_sender(command_mq, MyProc); + + response_mq = shm_mq_create(shm_toc_allocate(toc, BGSESSION_QUEUE_SIZE), + BGSESSION_QUEUE_SIZE); + shm_toc_insert(toc, BGSESSION_KEY_RESPONSE_QUEUE, response_mq); + shm_mq_set_receiver(response_mq, MyProc); + + session->command_qh = shm_mq_attach(command_mq, seg, NULL); + session->response_qh = shm_mq_attach(response_mq, seg, NULL); + + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = bgsession_worker_main; + snprintf(worker.bgw_name, BGW_MAXLEN, "background session by PID %d", MyProcPid); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &session->worker_handle)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You might need to increase max_worker_processes."))); + + shm_mq_set_handle(session->command_qh, session->worker_handle); + shm_mq_set_handle(session->response_qh, session->worker_handle); + + bgwstatus = WaitForBackgroundWorkerStartup(session->worker_handle, &pid); + if (bgwstatus != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not start background worker"))); + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'E': + case 'N': + rethrow_errornotice(&msg); + break; + case 'Z': + session->transaction_status = pq_getmsgbyte(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'Z'); + + return session; +} + + +void +BackgroundSessionEnd(BackgroundSession *session) +{ + StringInfoData msg; + + if (session->transaction_status == 'T') + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("background session ended with transaction block open"))); + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'X'); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + pfree(session->worker_handle); + dsm_detach(session->seg); + ResourceOwnerRelease(session->resowner, RESOURCE_RELEASE_BEFORE_LOCKS, false, false); + ResourceOwnerDelete(session->resowner); + pfree(session); +} + + +BackgroundSessionResult * +BackgroundSessionExecute(BackgroundSession *session, const char *sql) +{ + StringInfoData msg; + char msgtype; + BackgroundSessionResult *result; + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'Q'); + pq_sendstring(&msg, sql); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + result = palloc0(sizeof(*result)); + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'C': + { + const char *tag = pq_getmsgstring(&msg); + result->command = pstrdup(tag); + pq_getmsgend(&msg); + break; + } + case 'D': + if (!result->tupdesc) + elog(ERROR, "no T before D"); + result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(result->tupdesc, &msg)); + pq_getmsgend(&msg); + break; + case 'E': + case 'N': + rethrow_errornotice(&msg); + break; + case 'T': + if (result->tupdesc) + elog(ERROR, "already received a T message"); + result->tupdesc = TupleDesc_from_RowDescription(&msg); + pq_getmsgend(&msg); + break; + case 'Z': + session->transaction_status = pq_getmsgbyte(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'Z'); + + return result; +} + + +BackgroundSessionPreparedStatement * +BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs, + Oid argtypes[], const char *argnames[]) +{ + BackgroundSessionPreparedStatement *result; + StringInfoData msg; + int i; + char msgtype; + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'P'); + pq_sendstring(&msg, ""); + pq_sendstring(&msg, sql); + pq_sendint(&msg, nargs, 2); + for (i = 0; i < nargs; i++) + pq_sendint(&msg, argtypes[i], 4); + if (argnames) + for (i = 0; i < nargs; i++) + pq_sendstring(&msg, argnames[i]); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + result = palloc0(sizeof(*result)); + result->session = session; + result->argtypes = palloc(nargs * sizeof(*result->argtypes)); + memcpy(result->argtypes, argtypes, nargs * sizeof(*result->argtypes)); + + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case '1': + break; + case 'E': + rethrow_errornotice(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'D'); + pq_sendbyte(&msg, 'S'); + pq_sendstring(&msg, ""); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'E': + rethrow_errornotice(&msg); + break; + case 'n': + break; + case 't': + /* ignore for now */ + break; + case 'T': + if (result->tupdesc) + elog(ERROR, "already received a T message"); + result->tupdesc = TupleDesc_from_RowDescription(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'n' && msgtype != 'T'); + + return result; +} + + +BackgroundSessionResult * +BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[]) +{ + BackgroundSession *session; + StringInfoData msg; + BackgroundSessionResult *result; + char msgtype; + int i; + + session = stmt->session; + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'B'); + pq_sendstring(&msg, ""); + pq_sendstring(&msg, ""); + pq_sendint(&msg, 1, 2); /* number of parameter format codes */ + pq_sendint(&msg, 1, 2); + pq_sendint(&msg, nargs, 2); /* number of parameter values */ + for (i = 0; i < nargs; i++) + { + if (nulls[i]) + pq_sendint(&msg, -1, 4); + else + { + Oid typsend; + bool typisvarlena; + bytea *outputbytes; + + getTypeBinaryOutputInfo(stmt->argtypes[i], &typsend, &typisvarlena); + outputbytes = OidSendFunctionCall(typsend, values[i]); + pq_sendint(&msg, VARSIZE(outputbytes) - VARHDRSZ, 4); + pq_sendbytes(&msg, VARDATA(outputbytes), VARSIZE(outputbytes) - VARHDRSZ); + pfree(outputbytes); + } + } + pq_sendint(&msg, 1, 2); /* number of result column format codes */ + pq_sendint(&msg, 1, 2); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case '2': + break; + case 'E': + rethrow_errornotice(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_beginmessage(&msg, 'E'); + pq_sendstring(&msg, ""); + pq_sendint(&msg, 0, 4); + pq_endmessage(&msg); + pq_stop_redirect_to_shm_mq(); + + result = palloc0(sizeof(*result)); + result->tupdesc = stmt->tupdesc; + + do + { + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'C': + { + const char *tag = pq_getmsgstring(&msg); + result->command = pstrdup(tag); + pq_getmsgend(&msg); + break; + } + case 'D': + if (!stmt->tupdesc) + elog(ERROR, "did not expect any rows"); + result->tuples = lappend(result->tuples, HeapTuple_from_DataRow(stmt->tupdesc, &msg)); + pq_getmsgend(&msg); + break; + case 'E': + case 'N': + rethrow_errornotice(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + } + while (msgtype != 'C'); + + pq_redirect_to_shm_mq(session->seg, session->command_qh); + pq_putemptymessage('S'); + pq_stop_redirect_to_shm_mq(); + + shm_mq_receive_stringinfo(session->response_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'A': + forward_NotifyResponse(&msg); + break; + case 'Z': + session->transaction_status = pq_getmsgbyte(&msg); + pq_getmsgend(&msg); + break; + default: + invalid_protocol_message(msgtype); + break; + } + + return result; +} + + +static void +bgsession_worker_main(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + background_session_fixed_data *fdata; + char *gucstate; + shm_mq *command_mq; + shm_mq *response_mq; + shm_mq_handle *command_qh; + shm_mq_handle *response_qh; + StringInfoData msg; + char msgtype; + + pqsignal(SIGTERM, die); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "background session worker"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "background session", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not map dynamic shared memory segment"))); + + toc = shm_toc_attach(BGSESSION_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Find data structures in dynamic shared memory. */ + fdata = shm_toc_lookup(toc, BGSESSION_KEY_FIXED_DATA); + + gucstate = shm_toc_lookup(toc, BGSESSION_KEY_GUC); + + command_mq = shm_toc_lookup(toc, BGSESSION_KEY_COMMAND_QUEUE); + shm_mq_set_receiver(command_mq, MyProc); + command_qh = shm_mq_attach(command_mq, seg, NULL); + + response_mq = shm_toc_lookup(toc, BGSESSION_KEY_RESPONSE_QUEUE); + shm_mq_set_sender(response_mq, MyProc); + response_qh = shm_mq_attach(response_mq, seg, NULL); + + pq_redirect_to_shm_mq(seg, response_qh); + BackgroundWorkerInitializeConnectionByOid(fdata->database_id, + fdata->authenticated_user_id); + + SetClientEncoding(GetDatabaseEncoding()); + + StartTransactionCommand(); + RestoreGUCState(gucstate); + CommitTransactionCommand(); + + process_session_preload_libraries(); + + SetUserIdAndSecContext(fdata->current_user_id, fdata->sec_context); + + whereToSendOutput = DestRemote; + ReadyForQuery(whereToSendOutput); + + MessageContext = AllocSetContextCreate(TopMemoryContext, + "MessageContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + do + { + MemoryContextSwitchTo(MessageContext); + MemoryContextResetAndDeleteChildren(MessageContext); + + ProcessCompletedNotifies(); + pgstat_report_stat(false); + pgstat_report_activity(STATE_IDLE, NULL); + + shm_mq_receive_stringinfo(command_qh, &msg); + msgtype = pq_getmsgbyte(&msg); + + switch (msgtype) + { + case 'B': + { + SetCurrentStatementStartTimestamp(); + exec_bind_message(&msg); + break; + } + case 'D': + { + int describe_type; + const char *describe_target; + + SetCurrentStatementStartTimestamp(); + + describe_type = pq_getmsgbyte(&msg); + describe_target = pq_getmsgstring(&msg); + pq_getmsgend(&msg); + + switch (describe_type) + { + case 'S': + exec_describe_statement_message(describe_target); + break; +#ifdef TODO + case 'P': + exec_describe_portal_message(describe_target); + break; +#endif + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid DESCRIBE message subtype %d", + describe_type))); + break; + } + } + break; + case 'E': + { + const char *portal_name; + int max_rows; + + SetCurrentStatementStartTimestamp(); + + portal_name = pq_getmsgstring(&msg); + max_rows = pq_getmsgint(&msg, 4); + pq_getmsgend(&msg); + + exec_execute_message(portal_name, max_rows); + } + break; + + case 'P': + { + const char *stmt_name; + const char *query_string; + int numParams; + Oid *paramTypes = NULL; + + SetCurrentStatementStartTimestamp(); + + stmt_name = pq_getmsgstring(&msg); + query_string = pq_getmsgstring(&msg); + numParams = pq_getmsgint(&msg, 2); + if (numParams > 0) + { + int i; + + paramTypes = palloc(numParams * sizeof(Oid)); + for (i = 0; i < numParams; i++) + paramTypes[i] = pq_getmsgint(&msg, 4); + } + pq_getmsgend(&msg); + + exec_parse_message(query_string, stmt_name, + paramTypes, numParams); + break; + } + case 'Q': + { + const char *sql; + int save_log_statement; + bool save_log_duration; + int save_log_min_duration_statement; + + sql = pq_getmsgstring(&msg); + pq_getmsgend(&msg); + + /* XXX room for improvement */ + save_log_statement = log_statement; + save_log_duration = log_duration; + save_log_min_duration_statement = log_min_duration_statement; + + check_client_encoding_hook = bgsession_check_client_encoding_hook; + log_statement = LOGSTMT_NONE; + log_duration = false; + log_min_duration_statement = -1; + + SetCurrentStatementStartTimestamp(); + exec_simple_query(sql, 1); + + log_statement = save_log_statement; + log_duration = save_log_duration; + log_min_duration_statement = save_log_min_duration_statement; + check_client_encoding_hook = NULL; + + ReadyForQuery(whereToSendOutput); + break; + } + case 'S': + { + pq_getmsgend(&msg); + finish_xact_command(); + ReadyForQuery(whereToSendOutput); + break; + } + case 'X': + break; + default: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid protocol message type from background session leader: %c", + msgtype))); + break; + } + } + while (msgtype != 'X'); +} + + +static void +shm_mq_receive_stringinfo(shm_mq_handle *qh, StringInfoData *msg) +{ + shm_mq_result res; + Size nbytes; + void *data; + + res = shm_mq_receive(qh, &nbytes, &data, false); + if (res != SHM_MQ_SUCCESS) + elog(ERROR, "shm_mq_receive failed: %d", res); + + initStringInfo(msg); + appendBinaryStringInfo(msg, data, nbytes); +} + + +static void +bgsession_check_client_encoding_hook(void) +{ + elog(ERROR, "cannot set client encoding in background session"); +} + + +static TupleDesc +TupleDesc_from_RowDescription(StringInfo msg) +{ + TupleDesc tupdesc; + int natts = pq_getmsgint(msg, 2); + int i; + + tupdesc = CreateTemplateTupleDesc(natts, false); + for (i = 0; i < natts; i++) + { + const char *colname; + Oid type_oid; + int32 typmod; + int16 format; + + colname = pq_getmsgstring(msg); + (void) pq_getmsgint(msg, 4); /* table OID */ + (void) pq_getmsgint(msg, 2); /* table attnum */ + type_oid = pq_getmsgint(msg, 4); + (void) pq_getmsgint(msg, 2); /* type length */ + typmod = pq_getmsgint(msg, 4); + format = pq_getmsgint(msg, 2); + (void) format; +#ifdef TODO + /* XXX The protocol sometimes sends 0 (text) if the format is not + * determined yet. We always use binary, so this check is probably + * not useful. */ + if (format != 1) + elog(ERROR, "format must be binary"); +#endif + + TupleDescInitEntry(tupdesc, i + 1, colname, type_oid, typmod, 0); + } + return tupdesc; +} + + +static HeapTuple +HeapTuple_from_DataRow(TupleDesc tupdesc, StringInfo msg) +{ + int natts = pq_getmsgint(msg, 2); + int i; + Datum *values; + bool *nulls; + StringInfoData buf; + + Assert(tupdesc); + + if (natts != tupdesc->natts) + elog(ERROR, "malformed DataRow"); + + values = palloc(natts * sizeof(*values)); + nulls = palloc(natts * sizeof(*nulls)); + initStringInfo(&buf); + + for (i = 0; i < natts; i++) + { + int32 len = pq_getmsgint(msg, 4); + + if (len < 0) + nulls[i] = true; + else + { + Oid recvid; + Oid typioparams; + + nulls[i] = false; + + getTypeBinaryInputInfo(tupdesc->attrs[i]->atttypid, + &recvid, + &typioparams); + resetStringInfo(&buf); + appendBinaryStringInfo(&buf, pq_getmsgbytes(msg, len), len); + values[i] = OidReceiveFunctionCall(recvid, &buf, typioparams, + tupdesc->attrs[i]->atttypmod); + } + } + + return heap_form_tuple(tupdesc, values, nulls); +} + + +static void +forward_NotifyResponse(StringInfo msg) +{ + int32 pid; + const char *channel; + const char *payload; + + pid = pq_getmsgint(msg, 4); + channel = pq_getmsgrawstring(msg); + payload = pq_getmsgrawstring(msg); + pq_endmessage(msg); + + NotifyMyFrontEnd(channel, payload, pid); +} + + +static void +rethrow_errornotice(StringInfo msg) +{ + ErrorData edata; + + pq_parse_errornotice(msg, &edata); + edata.elevel = Min(edata.elevel, ERROR); + ThrowErrorData(&edata); +} + + +static void +invalid_protocol_message(char msgtype) +{ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid protocol message type from background session: %c", + msgtype))); +} diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 599874e..9b607b1 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -180,8 +180,6 @@ static int errdetail_execute(List *raw_parsetree_list); static int errdetail_params(ParamListInfo params); static int errdetail_abort(void); static int errdetail_recovery_conflict(void); -static void start_xact_command(void); -static void finish_xact_command(void); static bool IsTransactionExitStmt(Node *parsetree); static bool IsTransactionExitStmtList(List *parseTrees); static bool IsTransactionStmtList(List *parseTrees); @@ -869,8 +867,8 @@ pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams) * * Execute a "simple Query" protocol message. */ -static void -exec_simple_query(const char *query_string) +void +exec_simple_query(const char *query_string, int16 format) { CommandDest dest = whereToSendOutput; MemoryContext oldcontext; @@ -963,7 +961,6 @@ exec_simple_query(const char *query_string) *plantree_list; Portal portal; DestReceiver *receiver; - int16 format; /* * Get the command name for use in status display (it also becomes the @@ -1054,6 +1051,8 @@ exec_simple_query(const char *query_string) */ PortalStart(portal, NULL, 0, InvalidSnapshot); + if (format < 0) + { /* * Select the appropriate output format: text unless we are doing a * FETCH from a binary cursor. (Pretty grotty to have to do this here @@ -1074,6 +1073,7 @@ exec_simple_query(const char *query_string) format = 1; /* BINARY */ } } + } PortalSetResultFormat(portal, 1, &format); /* @@ -1185,7 +1185,7 @@ exec_simple_query(const char *query_string) * * Execute a "Parse" protocol message. */ -static void +void exec_parse_message(const char *query_string, /* string to execute */ const char *stmt_name, /* name for prepared stmt */ Oid *paramTypes, /* parameter types */ @@ -1447,7 +1447,7 @@ exec_parse_message(const char *query_string, /* string to execute */ * * Process a "Bind" message to create a portal from a prepared statement */ -static void +void exec_bind_message(StringInfo input_message) { const char *portal_name; @@ -1829,7 +1829,7 @@ exec_bind_message(StringInfo input_message) * * Process an "Execute" message for a portal */ -static void +void exec_execute_message(const char *portal_name, long max_rows) { CommandDest dest; @@ -2278,7 +2278,7 @@ errdetail_recovery_conflict(void) * * Process a "Describe" message for a prepared statement */ -static void +void exec_describe_statement_message(const char *stmt_name) { CachedPlanSource *psrc; @@ -2422,7 +2422,7 @@ exec_describe_portal_message(const char *portal_name) /* * Convenience routines for starting/committing a single command. */ -static void +void start_xact_command(void) { if (!xact_started) @@ -2442,7 +2442,7 @@ start_xact_command(void) } } -static void +void finish_xact_command(void) { if (xact_started) @@ -4066,7 +4066,7 @@ PostgresMain(int argc, char *argv[], if (am_walsender) exec_replication_command(query_string); else - exec_simple_query(query_string); + exec_simple_query(query_string, -1); send_ready_for_query = true; } diff --git a/src/include/commands/variable.h b/src/include/commands/variable.h index 8105951..73faff7 100644 --- a/src/include/commands/variable.h +++ b/src/include/commands/variable.h @@ -29,6 +29,7 @@ extern bool check_transaction_deferrable(bool *newval, void **extra, GucSource s extern bool check_random_seed(double *newval, void **extra, GucSource source); extern void assign_random_seed(double newval, void *extra); extern const char *show_random_seed(void); +extern void (*check_client_encoding_hook)(void); extern bool check_client_encoding(char **newval, void **extra, GucSource source); extern void assign_client_encoding(const char *newval, void *extra); extern bool check_session_authorization(char **newval, void **extra, GucSource source); diff --git a/src/include/libpq/pqmq.h b/src/include/libpq/pqmq.h index 8c03acb..6cc0090 100644 --- a/src/include/libpq/pqmq.h +++ b/src/include/libpq/pqmq.h @@ -17,6 +17,7 @@ #include "storage/shm_mq.h" extern void pq_redirect_to_shm_mq(dsm_segment *seg, shm_mq_handle *mqh); +extern void pq_stop_redirect_to_shm_mq(void); extern void pq_set_parallel_master(pid_t pid, BackendId backend_id); extern void pq_parse_errornotice(StringInfo str, ErrorData *edata); diff --git a/src/include/tcop/bgsession.h b/src/include/tcop/bgsession.h new file mode 100644 index 0000000..70dad45 --- /dev/null +++ b/src/include/tcop/bgsession.h @@ -0,0 +1,26 @@ +#ifndef BGSESSION_H +#define BGSESSION_H + +#include "access/tupdesc.h" +#include "nodes/pg_list.h" + +struct BackgroundSession; +typedef struct BackgroundSession BackgroundSession; + +struct BackgroundSessionPreparedStatement; +typedef struct BackgroundSessionPreparedStatement BackgroundSessionPreparedStatement; + +typedef struct BackgroundSessionResult +{ + TupleDesc tupdesc; + List *tuples; + const char *command; +} BackgroundSessionResult; + +BackgroundSession *BackgroundSessionStart(void); +void BackgroundSessionEnd(BackgroundSession *session); +BackgroundSessionResult *BackgroundSessionExecute(BackgroundSession *session, const char *sql); +BackgroundSessionPreparedStatement *BackgroundSessionPrepare(BackgroundSession *session, const char *sql, int nargs, Oid argtypes[], const char *argnames[]); +BackgroundSessionResult *BackgroundSessionExecutePrepared(BackgroundSessionPreparedStatement *stmt, int nargs, Datum values[], bool nulls[]); + +#endif /* BGSESSION_H */ diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h index 7254355..8f2ff8a 100644 --- a/src/include/tcop/tcopprot.h +++ b/src/include/tcop/tcopprot.h @@ -57,6 +57,12 @@ extern PlannedStmt *pg_plan_query(Query *querytree, int cursorOptions, ParamListInfo boundParams); extern List *pg_plan_queries(List *querytrees, int cursorOptions, ParamListInfo boundParams); +extern void exec_simple_query(const char *query_string, int16 format); +extern void exec_parse_message(const char *query_string, const char *stmt_name, + Oid paramTypes[], int numParams); +extern void exec_bind_message(StringInfo input_message); +extern void exec_execute_message(const char *portal_name, long max_rows); +extern void exec_describe_statement_message(const char *stmt_name); extern bool check_max_stack_depth(int *newval, void **extra, GucSource source); extern void assign_max_stack_depth(int newval, void *extra); @@ -70,6 +76,9 @@ extern void RecoveryConflictInterrupt(ProcSignalReason reason); /* called from S extern void ProcessClientReadInterrupt(bool blocked); extern void ProcessClientWriteInterrupt(bool blocked); +extern void start_xact_command(void); +extern void finish_xact_command(void); + extern void process_postgres_switches(int argc, char *argv[], GucContext ctx, const char **dbname); extern void PostgresMain(int argc, char *argv[], diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile index 7680d49..9895a6e 100644 --- a/src/pl/plpython/Makefile +++ b/src/pl/plpython/Makefile @@ -20,6 +20,7 @@ PGFILEDESC = "PL/Python - procedural language" NAME = plpython$(python_majorversion) OBJS = \ + plpy_bgsession.o \ plpy_cursorobject.o \ plpy_elog.o \ plpy_exec.o \ @@ -89,6 +90,7 @@ REGRESS = \ plpython_quote \ plpython_composite \ plpython_subtransaction \ + plpython_bgsession \ plpython_drop REGRESS_PLPYTHON3_MANGLE := $(REGRESS) diff --git a/src/pl/plpython/expected/plpython_bgsession.out b/src/pl/plpython/expected/plpython_bgsession.out new file mode 100644 index 0000000..b8dff07 --- /dev/null +++ b/src/pl/plpython/expected/plpython_bgsession.out @@ -0,0 +1,188 @@ +CREATE TABLE test1 (a int, b text); +CREATE FUNCTION bgsession_test() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + for i in range(0, 10): + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + a.execute("COMMIT") + else: + a.execute("ROLLBACK") + +return 42 +$$; +SELECT bgsession_test(); + bgsession_test +---------------- + 42 +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +CREATE FUNCTION bgsession_test2() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (11)") + rv = a.execute("SELECT * FROM test1") + plpy.info(rv) + a.execute("ROLLBACK") + +return 42 +$$; +SELECT bgsession_test2(); +INFO: <PLyResult status=5 nrows=6 rows=[{'a': 0, 'b': None}, {'a': 2, 'b': None}, {'a': 4, 'b': None}, {'a': 6, 'b': None}, {'a': 8, 'b': None}, {'a': 11, 'b': None}]> + bgsession_test2 +----------------- + 42 +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +CREATE FUNCTION bgsession_test3() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$") + a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$") + +return 42 +$$; +SELECT bgsession_test3(); +NOTICE: notice +ERROR: error +CONTEXT: PL/pgSQL function inline_code_block line 1 at RAISE +PL/Python function "bgsession_test3" +CREATE FUNCTION bgsession_test4() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("SET client_encoding TO SJIS") + +return 42 +$$; +SELECT bgsession_test4(); +ERROR: cannot set client encoding in background session +CONTEXT: PL/Python function "bgsession_test4" +TRUNCATE test1; +CREATE FUNCTION bgsession_test5() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"]) + a.execute_prepared(plan, [1, "one"]) + a.execute_prepared(plan, [2, "two"]) + +return 42 +$$; +SELECT bgsession_test5(); + bgsession_test5 +----------------- + 42 +(1 row) + +SELECT * FROM test1; + a | b +---+----- + 1 | one + 2 | two +(2 rows) + +TRUNCATE test1; +CREATE FUNCTION bgsession_test7() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"]) + a.execute_prepared(plan, [11]) + plan = a.prepare("SELECT * FROM test1") + rv = a.execute_prepared(plan, []) + plpy.info(rv) + a.execute("ROLLBACK") + +return 42 +$$; +SELECT bgsession_test7(); +INFO: <PLyResult status=5 nrows=1 rows=[{'a': 11, 'b': None}]> + bgsession_test7 +----------------- + 42 +(1 row) + +SELECT * FROM test1; + a | b +---+--- +(0 rows) + +CREATE FUNCTION bgsession_test8() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + +return 42 +$$; +SELECT bgsession_test8(); +ERROR: background session ended with transaction block open +CONTEXT: PL/Python function "bgsession_test8" +TRUNCATE test1; +CREATE FUNCTION bgsession_test9a() RETURNS integer +LANGUAGE plpythonu +AS $$ +bg = plpy.BackgroundSession() +GD['bg'] = bg +bg.execute("BEGIN") +bg.execute("INSERT INTO test1 VALUES (1)") + +return 1 +$$; +CREATE FUNCTION bgsession_test9b() RETURNS integer +LANGUAGE plpythonu +AS $$ +bg = GD['bg'] +bg.execute("INSERT INTO test1 VALUES (2)") +bg.execute("COMMIT") +bg.close() + +return 2 +$$; +SELECT bgsession_test9a(); + bgsession_test9a +------------------ + 1 +(1 row) + +SELECT bgsession_test9b(); + bgsession_test9b +------------------ + 2 +(1 row) + +SELECT * FROM test1; + a | b +---+--- + 1 | + 2 | +(2 rows) + +DROP TABLE test1; diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index 847e4cc..2001b60 100644 --- a/src/pl/plpython/expected/plpython_test.out +++ b/src/pl/plpython/expected/plpython_test.out @@ -43,8 +43,9 @@ contents.sort() return contents $$ LANGUAGE plpythonu; select module_contents(); - module_contents ------------------ + module_contents +------------------- + BackgroundSession Error Fatal SPIError @@ -63,7 +64,7 @@ select module_contents(); spiexceptions subtransaction warning -(18 rows) +(19 rows) CREATE FUNCTION elog_test_basic() RETURNS void AS $$ diff --git a/src/pl/plpython/plpy_bgsession.c b/src/pl/plpython/plpy_bgsession.c new file mode 100644 index 0000000..68f7207 --- /dev/null +++ b/src/pl/plpython/plpy_bgsession.c @@ -0,0 +1,454 @@ +/* + * the PLyBackgroundSession class + * + * src/pl/plpython/plpy_bgsession.c + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "executor/spi.h" +#include "parser/parse_type.h" +#include "utils/memutils.h" +#include "utils/syscache.h" + +#include "plpython.h" + +#include "plpy_bgsession.h" + +#include "plpy_elog.h" +#include "plpy_main.h" +#include "plpy_planobject.h" +#include "plpy_spi.h" + + +static PyObject *PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw); +static void PLyBackgroundSession_dealloc(PyObject *subxact); +static PyObject *PLyBackgroundSession_enter(PyObject *self, PyObject *unused); +static PyObject *PLyBackgroundSession_close(PyObject *self, PyObject *args); +static PyObject *PLyBackgroundSession_execute(PyObject *self, PyObject *args); +static PyObject *PLyBackgroundSession_prepare(PyObject *self, PyObject *args); +static PyObject *PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args); + +static char PLyBackgroundSession_doc[] = { + "PostgreSQL background session context manager" +}; + +static PyMethodDef PLyBackgroundSession_methods[] = { + {"close", PLyBackgroundSession_close, METH_VARARGS, NULL}, + {"__enter__", PLyBackgroundSession_enter, METH_VARARGS, NULL}, + {"__exit__", PLyBackgroundSession_close, METH_VARARGS, NULL}, + {"execute", PLyBackgroundSession_execute, METH_VARARGS, NULL}, + {"prepare", PLyBackgroundSession_prepare, METH_VARARGS, NULL}, + {"execute_prepared", PLyBackgroundSession_execute_prepared, METH_VARARGS, NULL}, + {NULL, NULL, 0, NULL} +}; + +static PyTypeObject PLyBackgroundSession_Type = { + PyVarObject_HEAD_INIT(NULL, 0) + "plpy.BackgroundSession", /* tp_name */ + sizeof(PLyBackgroundSession_Object), /* tp_size */ + 0, /* tp_itemsize */ + + /* + * methods + */ + PLyBackgroundSession_dealloc, /* tp_dealloc */ + 0, /* tp_print */ + 0, /* tp_getattr */ + 0, /* tp_setattr */ + 0, /* tp_compare */ + 0, /* tp_repr */ + 0, /* tp_as_number */ + 0, /* tp_as_sequence */ + 0, /* tp_as_mapping */ + 0, /* tp_hash */ + 0, /* tp_call */ + 0, /* tp_str */ + 0, /* tp_getattro */ + 0, /* tp_setattro */ + 0, /* tp_as_buffer */ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, /* tp_flags */ + PLyBackgroundSession_doc, /* tp_doc */ + 0, /* tp_traverse */ + 0, /* tp_clear */ + 0, /* tp_richcompare */ + 0, /* tp_weaklistoffset */ + 0, /* tp_iter */ + 0, /* tp_iternext */ + PLyBackgroundSession_methods, /* tp_tpmethods */ + 0, /* tp_members */ + 0, /* tp_getset */ + 0, /* tp_base */ + 0, /* tp_dict */ + 0, /* tp_descr_get */ + 0, /* tp_descr_set */ + 0, /* tp_dictoffset */ + 0, /* tp_init */ + 0, /* tp_alloc */ + PLyBackgroundSession_new, /* tp_new */ + 0, /* tp_free */ +}; + + +int +PLy_bgsession_init_type(PyObject *module) +{ + if (PyType_Ready(&PLyBackgroundSession_Type) < 0) + return -1; + + Py_INCREF(&PLyBackgroundSession_Type); + if (PyModule_AddObject(module, "BackgroundSession", (PyObject *)&PLyBackgroundSession_Type) < 0) + return -1; + + return 0; +} + +static PyObject * +PLyBackgroundSession_new(PyTypeObject *type, PyObject *args, PyObject *kw) +{ + PyObject *result = type->tp_alloc(type, 0); + PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) result; + + bgsession->bgsession = BackgroundSessionStart(); + + return result; +} + +/* + * Python requires a dealloc function to be defined + */ +static void +PLyBackgroundSession_dealloc(PyObject *self) +{ +} + +/* + * bgsession.__enter__() or bgsession.enter() + */ +static PyObject * +PLyBackgroundSession_enter(PyObject *self, PyObject *unused) +{ + Py_INCREF(self); + return self; +} + +/* + * bgsession.close() or bgsession.__exit__(exc_type, exc, tb) + */ +static PyObject * +PLyBackgroundSession_close(PyObject *self, PyObject *args) +{ + PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self; + + if (!bgsession->bgsession) + { + PLy_exception_set(PyExc_ValueError, "this background session has already been closed"); + return NULL; + } + + BackgroundSessionEnd(bgsession->bgsession); + bgsession->bgsession = NULL; + + Py_INCREF(Py_None); + return Py_None; +} + +static PyObject * +PLyBackgroundSession_execute(PyObject *self, PyObject *args) +{ + PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self; + char *query; + + if (!bgsession->bgsession) + { + PLy_exception_set(PyExc_ValueError, "this background session has already been closed"); + return NULL; + } + + if (PyArg_ParseTuple(args, "s:execute", &query)) + { + BackgroundSessionResult *result; + HeapTuple *tuples; + ListCell *lc; + int i; + SPITupleTable faketupletable; + + result = BackgroundSessionExecute(bgsession->bgsession, query); + if (result->tupdesc) + { + tuples = palloc(list_length(result->tuples) * sizeof(*tuples)); + i = 0; + foreach (lc, result->tuples) + { + HeapTuple tuple = (HeapTuple) lfirst(lc); + tuples[i++] = tuple; + } + faketupletable.tupdesc = result->tupdesc; + faketupletable.vals = tuples; + return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT); + } + else + return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY); + } + else + PLy_exception_set(PLy_exc_error, "background session execute expected a query"); + return NULL; +} + +// XXX lots of overlap with PLy_spi_prepare +static PyObject * +PLyBackgroundSession_prepare(PyObject *self, PyObject *args) +{ + PLyBackgroundSession_Object *bgsession = (PLyBackgroundSession_Object *) self; + char *query; + PyObject *paraminfo = NULL; + BackgroundSessionPreparedStatement *bgstmt; + int nargs = 0; + const char **argnames = NULL; + PLyPlanObject *plan; + PyObject *volatile optr = NULL; + volatile MemoryContext oldcontext; + int i; + PLyExecutionContext *exec_ctx = PLy_current_execution_context(); + PyObject *keys; + + if (!bgsession->bgsession) + { + PLy_exception_set(PyExc_ValueError, "this background session has already been closed"); + return NULL; + } + + if (!PyArg_ParseTuple(args, "s|O:prepare", &query, ¶minfo)) + return NULL; + + if (paraminfo && + !PySequence_Check(paraminfo) && !PyMapping_Check(paraminfo)) + { + PLy_exception_set(PyExc_TypeError, + "second argument of prepare must be a sequence or mapping"); + return NULL; + } + + if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL) + return NULL; + + plan->mcxt = AllocSetContextCreate(TopMemoryContext, + "PL/Python background plan context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + oldcontext = MemoryContextSwitchTo(plan->mcxt); + + if (!paraminfo) + nargs = 0; + else if (PySequence_Check(paraminfo)) + nargs = PySequence_Length(paraminfo); + else + nargs = PyMapping_Length(paraminfo); + + plan->nargs = nargs; + plan->types = nargs ? palloc(sizeof(Oid) * nargs) : NULL; + plan->values = nargs ? palloc(sizeof(Datum) * nargs) : NULL; + plan->args = nargs ? palloc(sizeof(PLyTypeInfo) * nargs) : NULL; + + MemoryContextSwitchTo(oldcontext); + + if (PyMapping_Check(paraminfo)) + { + argnames = palloc(nargs * sizeof(char *)); + keys = PyMapping_Keys(paraminfo); + } + else + { + argnames = NULL; + keys = NULL; + } + + for (i = 0; i < nargs; i++) + { + PLy_typeinfo_init(&plan->args[i], plan->mcxt); + plan->values[i] = PointerGetDatum(NULL); + } + + for (i = 0; i < nargs; i++) + { + char *sptr; + HeapTuple typeTup; + Oid typeId; + int32 typmod; + + if (keys) + { + PyObject *key; + char *keystr; + + key = PySequence_GetItem(keys, i); + argnames[i] = keystr = PyString_AsString(key); + optr = PyMapping_GetItemString(paraminfo, keystr); + Py_DECREF(key); + } + else + optr = PySequence_GetItem(paraminfo, i); + + if (PyString_Check(optr)) + sptr = PyString_AsString(optr); + else if (PyUnicode_Check(optr)) + sptr = PLyUnicode_AsString(optr); + else + { + ereport(ERROR, + (errmsg("background session prepare: type name at ordinal position %d is not a string", i))); + sptr = NULL; /* keep compiler quiet */ + } + + /******************************************************** + * Resolve argument type names and then look them up by + * oid in the system cache, and remember the required + *information for input conversion. + ********************************************************/ + + parseTypeString(sptr, &typeId, &typmod, false); + + typeTup = SearchSysCache1(TYPEOID, + ObjectIdGetDatum(typeId)); + if (!HeapTupleIsValid(typeTup)) + elog(ERROR, "cache lookup failed for type %u", typeId); + + Py_DECREF(optr); + + /* + * set optr to NULL, so we won't try to unref it again in case of + * an error + */ + optr = NULL; + + plan->types[i] = typeId; + PLy_output_datum_func(&plan->args[i], typeTup, exec_ctx->curr_proc->langid, exec_ctx->curr_proc->trftypes); + ReleaseSysCache(typeTup); + } + + bgstmt = BackgroundSessionPrepare(bgsession->bgsession, query, nargs, plan->types, argnames); + + plan->bgstmt = bgstmt; + + return (PyObject *) plan; +} + +static PyObject * +PLyBackgroundSession_execute_prepared(PyObject *self, PyObject *args) +{ + PLyBackgroundSession_Object *bgsession pg_attribute_unused() = (PLyBackgroundSession_Object *) self; + PyObject *ob; + PLyPlanObject *plan; + PyObject *list = NULL; + int nargs; + bool *nulls; + BackgroundSessionResult *result; + HeapTuple *tuples; + ListCell *lc; + int i; + SPITupleTable faketupletable; + + if (!bgsession->bgsession) + { + PLy_exception_set(PyExc_ValueError, "this background session has already been closed"); + return NULL; + } + + if (!PyArg_ParseTuple(args, "O|O:execute_prepared", &ob, &list)) + return NULL; + + if (!is_PLyPlanObject(ob)) + { + PLy_exception_set(PyExc_TypeError, + "first argument of execute_prepared must be a plan"); + return NULL; + } + + plan = (PLyPlanObject *) ob; + + if (list && (!PySequence_Check(list))) + { + PLy_exception_set(PyExc_TypeError, + "second argument of execute_prepared must be a sequence"); + return NULL; + } + + nargs = list ? PySequence_Length(list) : 0; + + if (nargs != plan->nargs) + { + char *sv; + PyObject *so = PyObject_Str(list); + + if (!so) + PLy_elog(ERROR, "could not execute plan"); + sv = PyString_AsString(so); + PLy_exception_set_plural(PyExc_TypeError, + "Expected sequence of %d argument, got %d: %s", + "Expected sequence of %d arguments, got %d: %s", + plan->nargs, + plan->nargs, nargs, sv); + Py_DECREF(so); + + return NULL; + } + + nulls = palloc(nargs * sizeof(*nulls)); + + for (i = 0; i < nargs; i++) + { + PyObject *elem; + + elem = PySequence_GetItem(list, i); + if (elem != Py_None) + { + PG_TRY(); + { + plan->values[i] = + plan->args[i].out.d.func(&(plan->args[i].out.d), + -1, + elem, + false); + } + PG_CATCH(); + { + Py_DECREF(elem); + PG_RE_THROW(); + } + PG_END_TRY(); + + Py_DECREF(elem); + nulls[i] = false; + } + else + { + Py_DECREF(elem); + plan->values[i] = + InputFunctionCall(&(plan->args[i].out.d.typfunc), + NULL, + plan->args[i].out.d.typioparam, + -1); + nulls[i] = true; + } + } + + result = BackgroundSessionExecutePrepared(plan->bgstmt, nargs, plan->values, nulls); + if (result->tupdesc) + { + tuples = palloc(list_length(result->tuples) * sizeof(*tuples)); + i = 0; + foreach (lc, result->tuples) + { + HeapTuple tuple = (HeapTuple) lfirst(lc); + tuples[i++] = tuple; + } + faketupletable.tupdesc = result->tupdesc; + faketupletable.vals = tuples; + return PLy_spi_execute_fetch_result(&faketupletable, list_length(result->tuples), SPI_OK_SELECT); + } + else + return PLy_spi_execute_fetch_result(NULL, 0, SPI_OK_UTILITY); +} diff --git a/src/pl/plpython/plpy_bgsession.h b/src/pl/plpython/plpy_bgsession.h new file mode 100644 index 0000000..39daca2 --- /dev/null +++ b/src/pl/plpython/plpy_bgsession.h @@ -0,0 +1,18 @@ +/* + * src/pl/plpython/plpy_bgsession.h + */ + +#ifndef PLPY_BGSESSION_H +#define PLPY_BGSESSION_H + +#include "tcop/bgsession.h" + +typedef struct PLyBackgroundSession_Object +{ + PyObject_HEAD + BackgroundSession *bgsession; +} PLyBackgroundSession_Object; + +extern int PLy_bgsession_init_type(PyObject *module); + +#endif /* PLPY_BGSESSION_H */ diff --git a/src/pl/plpython/plpy_main.h b/src/pl/plpython/plpy_main.h index 10426c4..b7ee34c 100644 --- a/src/pl/plpython/plpy_main.h +++ b/src/pl/plpython/plpy_main.h @@ -7,6 +7,8 @@ #include "plpy_procedure.h" +#include "tcop/bgsession.h" + /* the interpreter's globals dict */ extern PyObject *PLy_interp_globals; @@ -19,6 +21,7 @@ typedef struct PLyExecutionContext { PLyProcedure *curr_proc; /* the currently executing procedure */ MemoryContext scratch_ctx; /* a context for things like type I/O */ + BackgroundSession *bgsession; struct PLyExecutionContext *next; /* previous stack level */ } PLyExecutionContext; diff --git a/src/pl/plpython/plpy_planobject.c b/src/pl/plpython/plpy_planobject.c index 16c39a0..006a3af 100644 --- a/src/pl/plpython/plpy_planobject.c +++ b/src/pl/plpython/plpy_planobject.c @@ -77,6 +77,7 @@ PLy_plan_new(void) return NULL; ob->plan = NULL; + ob->bgstmt = NULL; ob->nargs = 0; ob->types = NULL; ob->values = NULL; diff --git a/src/pl/plpython/plpy_planobject.h b/src/pl/plpython/plpy_planobject.h index c675592..fba814a 100644 --- a/src/pl/plpython/plpy_planobject.h +++ b/src/pl/plpython/plpy_planobject.h @@ -6,6 +6,7 @@ #define PLPY_PLANOBJECT_H #include "executor/spi.h" +#include "tcop/bgsession.h" #include "plpy_typeio.h" @@ -13,6 +14,7 @@ typedef struct PLyPlanObject { PyObject_HEAD SPIPlanPtr plan; + BackgroundSessionPreparedStatement *bgstmt; int nargs; Oid *types; Datum *values; diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c index d80dc51..06848bc 100644 --- a/src/pl/plpython/plpy_plpymodule.c +++ b/src/pl/plpython/plpy_plpymodule.c @@ -13,6 +13,7 @@ #include "plpy_plpymodule.h" +#include "plpy_bgsession.h" #include "plpy_cursorobject.h" #include "plpy_elog.h" #include "plpy_planobject.h" @@ -134,6 +135,8 @@ PyInit_plpy(void) return NULL; PLy_add_exceptions(m); + if (PLy_bgsession_init_type(plpy) < 0) + return NULL; return m; } @@ -164,6 +167,8 @@ PLy_init_plpy(void) #else plpy = Py_InitModule("plpy", PLy_methods); PLy_add_exceptions(plpy); + if (PLy_bgsession_init_type(plpy) < 0) + PLy_elog(ERROR, "could not initialize BackgroundSession type"); #endif /* PyDict_SetItemString(plpy, "PlanType", (PyObject *) &PLy_PlanType); */ diff --git a/src/pl/plpython/plpy_spi.c b/src/pl/plpython/plpy_spi.c index b082d01..ec9eeec 100644 --- a/src/pl/plpython/plpy_spi.c +++ b/src/pl/plpython/plpy_spi.c @@ -31,8 +31,6 @@ static PyObject *PLy_spi_execute_query(char *query, long limit); static PyObject *PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit); -static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, - uint64 rows, int status); static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata); @@ -292,6 +290,7 @@ PLy_spi_execute_plan(PyObject *ob, PyObject *list, long limit) rv = SPI_execute_plan(plan->plan, plan->values, nulls, exec_ctx->curr_proc->fn_readonly, limit); ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); + SPI_freetuptable(SPI_tuptable); if (nargs > 0) pfree(nulls); @@ -361,6 +360,7 @@ PLy_spi_execute_query(char *query, long limit) pg_verifymbstr(query, strlen(query), false); rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit); ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); + SPI_freetuptable(SPI_tuptable); PLy_spi_subtransaction_commit(oldcontext, oldowner); } @@ -383,7 +383,7 @@ PLy_spi_execute_query(char *query, long limit) return ret; } -static PyObject * +PyObject * PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) { PLyResultObject *result; @@ -470,7 +470,6 @@ PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) PG_END_TRY(); MemoryContextDelete(cxt); - SPI_freetuptable(tuptable); } return (PyObject *) result; diff --git a/src/pl/plpython/plpy_spi.h b/src/pl/plpython/plpy_spi.h index b042794..9ed37e5 100644 --- a/src/pl/plpython/plpy_spi.h +++ b/src/pl/plpython/plpy_spi.h @@ -5,12 +5,15 @@ #ifndef PLPY_SPI_H #define PLPY_SPI_H +#include "executor/spi.h" #include "utils/palloc.h" #include "utils/resowner.h" extern PyObject *PLy_spi_prepare(PyObject *self, PyObject *args); extern PyObject *PLy_spi_execute(PyObject *self, PyObject *args); +extern PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status); + typedef struct PLyExceptionEntry { int sqlstate; /* hash key, must be first */ diff --git a/src/pl/plpython/sql/plpython_bgsession.sql b/src/pl/plpython/sql/plpython_bgsession.sql new file mode 100644 index 0000000..5c33ab2 --- /dev/null +++ b/src/pl/plpython/sql/plpython_bgsession.sql @@ -0,0 +1,148 @@ +CREATE TABLE test1 (a int, b text); + +CREATE FUNCTION bgsession_test() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + for i in range(0, 10): + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + a.execute("COMMIT") + else: + a.execute("ROLLBACK") + +return 42 +$$; + +SELECT bgsession_test(); + +SELECT * FROM test1; + + +CREATE FUNCTION bgsession_test2() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + a.execute("INSERT INTO test1 (a) VALUES (11)") + rv = a.execute("SELECT * FROM test1") + plpy.info(rv) + a.execute("ROLLBACK") + +return 42 +$$; + +SELECT bgsession_test2(); + +SELECT * FROM test1; + + +CREATE FUNCTION bgsession_test3() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("DO $_$ BEGIN RAISE NOTICE 'notice'; END $_$") + a.execute("DO $_$ BEGIN RAISE EXCEPTION 'error'; END $_$") + +return 42 +$$; + +SELECT bgsession_test3(); + + +CREATE FUNCTION bgsession_test4() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("SET client_encoding TO SJIS") + +return 42 +$$; + +SELECT bgsession_test4(); + + +TRUNCATE test1; + +CREATE FUNCTION bgsession_test5() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + plan = a.prepare("INSERT INTO test1 (a, b) VALUES ($1, $2)", ["int4", "text"]) + a.execute_prepared(plan, [1, "one"]) + a.execute_prepared(plan, [2, "two"]) + +return 42 +$$; + +SELECT bgsession_test5(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +CREATE FUNCTION bgsession_test7() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + plan = a.prepare("INSERT INTO test1 (a) VALUES ($1)", ["int4"]) + a.execute_prepared(plan, [11]) + plan = a.prepare("SELECT * FROM test1") + rv = a.execute_prepared(plan, []) + plpy.info(rv) + a.execute("ROLLBACK") + +return 42 +$$; + +SELECT bgsession_test7(); + +SELECT * FROM test1; + + +CREATE FUNCTION bgsession_test8() RETURNS integer +LANGUAGE plpythonu +AS $$ +with plpy.BackgroundSession() as a: + a.execute("BEGIN") + +return 42 +$$; + +SELECT bgsession_test8(); + + +TRUNCATE test1; + +CREATE FUNCTION bgsession_test9a() RETURNS integer +LANGUAGE plpythonu +AS $$ +bg = plpy.BackgroundSession() +GD['bg'] = bg +bg.execute("BEGIN") +bg.execute("INSERT INTO test1 VALUES (1)") + +return 1 +$$; + +CREATE FUNCTION bgsession_test9b() RETURNS integer +LANGUAGE plpythonu +AS $$ +bg = GD['bg'] +bg.execute("INSERT INTO test1 VALUES (2)") +bg.execute("COMMIT") +bg.close() + +return 2 +$$; + +SELECT bgsession_test9a(); +SELECT bgsession_test9b(); + +SELECT * FROM test1; + + +DROP TABLE test1; -- 2.10.2
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers