Here is a patch that implements transaction control in PL/Python procedures. (This patch goes on top of "SQL procedures" patch v1.)
So you can do this: CREATE PROCEDURE transaction_test1() LANGUAGE plpythonu AS $$ for i in range(0, 10): plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) if i % 2 == 0: plpy.commit() else: plpy.rollback() $$; CALL transaction_test1(); I started with PL/Python because the internal structures there are more manageable. Obviously, people will want this for PL/pgSQL as well, and I have that in the works. It's not in a usable state, but I have found that the work needed is essentially the same as in PL/Python for example. I have discovered three groups of obstacles that needed addressing to make this work. At this point, the patch is more of a demo of what these issues are, and if we come to satisfactory solutions for each of them, things should fall into place more easily afterwards. 1) While calling CommitTransactionCommand() in the middle of a utility command works just fine (several utility commands do this, of course), calling AbortCurrentTransaction() in a similar way does not. There are a few pieces of code that think that a transaction abort will always result in a return to the main control loop, and so they will just clean away everything. This is what the changes in portalmem.c are about. Some comments there already hint about the issue. No doubt this will need further refinement. I think it would be desirable in general to separate the control flow concerns from the transaction management concerns more cleanly. 2) SPI needs some work. It thinks that it can clean everything away at transaction end. I have found that instead of TopTransactionContext one can use PortalContext and get a more suitable life cycle for the memory. I have played with some variants to make this configurable (e.g., argument to SPI_connect()), but that didn't seem very useful. There are some comments indicating that there might not always be a PortalContext, but the existing tests don't seem to mind. (There was a thread recently about making a fake PortalContext for autovacuum, so maybe the current idea is that we make sure there always is a PortalContext.) Maybe we need another context like StatementContext or ProcedureContext. There also needs to be a way via SPI to end transactions and allowing *some* cleanup to happen but leaving the memory around. I have done that via additional SPI API functions like SPI_commit(), which are then available to PL implementations. I also tried making it possible calling transaction statements directly via SPI_exec() or similar, but that ended up a total disaster. So from the API perspective, I like the current implementation, but the details will no doubt need refinement. 3) The PL implementations themselves allocate memory in transaction-bound contexts for convenience as well. This is usually easy to fix by switching to PortalContext as well. As you see, the PL/Python code part of the patch is actually very small. Changes in other PLs would be similar. Two issues have not been addressed yet: A) It would be desirable to be able to run commands such as VACUUM and CREATE INDEX CONCURRENTLY in a procedure. This needs a bit of thinking and standards-lawyering about the semantics, like where exactly do transactions begin and end in various combinations. It will probably also need a different entry point into SPI, because SPI_exec cannot handle statements ending transactions. But so far my assessment is that this can be added in a mostly independent way later on. B) There needs to be some kind of call stack for procedure and function invocations, so that we can track when we are allowed to make transaction controlling calls. The key term in the SQL standard is "non-atomic execution context", which seems specifically devised to cover this scenario. So for example, if you have CALL -> CALL -> CALL, the third call can issue a transaction statement. But if you have CALL -> SELECT -> CALL, then the last call cannot, because the SELECT introduces an atomic execution context. I don't know if we have such a thing yet or something that we could easily latch on to. -- Peter Eisentraut http://www.2ndQuadrant.com/ PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 1b3318154540d0fe71480ca58938433ecfccabbd Mon Sep 17 00:00:00 2001 From: Peter Eisentraut <pete...@gmx.net> Date: Tue, 31 Oct 2017 14:48:51 -0400 Subject: [PATCH v1] Transaction control in PL/Python procedures Add .commit, .rollback, and .start_transaction functions to plpy module to control transactions in a PL/Python function. Add similar underlying functions to SPI. Some additional cleanup so that transaction commit or abort doesn't blow away data structures still used by the procedure call. --- src/backend/commands/functioncmds.c | 3 + src/backend/executor/spi.c | 71 +++++++++++++++++------ src/backend/utils/mmgr/portalmem.c | 39 +++++-------- src/include/executor/spi.h | 4 ++ src/include/executor/spi_priv.h | 1 + src/pl/plpython/Makefile | 1 + src/pl/plpython/expected/plpython_test.out | 9 ++- src/pl/plpython/expected/plpython_transaction.out | 44 ++++++++++++++ src/pl/plpython/plpy_main.c | 2 +- src/pl/plpython/plpy_plpymodule.c | 38 ++++++++++++ src/pl/plpython/sql/plpython_transaction.sql | 36 ++++++++++++ 11 files changed, 201 insertions(+), 47 deletions(-) create mode 100644 src/pl/plpython/expected/plpython_transaction.out create mode 100644 src/pl/plpython/sql/plpython_transaction.sql diff --git a/src/backend/commands/functioncmds.c b/src/backend/commands/functioncmds.c index 1f3156d870..badef78fdf 100644 --- a/src/backend/commands/functioncmds.c +++ b/src/backend/commands/functioncmds.c @@ -65,6 +65,7 @@ #include "utils/fmgroids.h" #include "utils/guc.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" #include "utils/syscache.h" #include "utils/tqual.h" @@ -2255,6 +2256,8 @@ ExecuteCallStmt(ParseState *pstate, CallStmt *stmt) FUNC_MAX_ARGS, FUNC_MAX_ARGS))); + MemoryContextSwitchTo(PortalContext); + fmgr_info(fexpr->funcid, &flinfo); InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, NULL, NULL); diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c index 40292b86c1..f097f15731 100644 --- a/src/backend/executor/spi.c +++ b/src/backend/executor/spi.c @@ -92,7 +92,7 @@ SPI_connect(void) elog(ERROR, "SPI stack corrupted"); newdepth = 16; _SPI_stack = (_SPI_connection *) - MemoryContextAlloc(TopTransactionContext, + MemoryContextAlloc(TopMemoryContext, newdepth * sizeof(_SPI_connection)); _SPI_stack_depth = newdepth; } @@ -133,10 +133,10 @@ SPI_connect(void) * Perhaps CurTransactionContext would do? For now it doesn't matter * because we clean up explicitly in AtEOSubXact_SPI(). */ - _SPI_current->procCxt = AllocSetContextCreate(TopTransactionContext, + _SPI_current->procCxt = AllocSetContextCreate(PortalContext, "SPI Proc", ALLOCSET_DEFAULT_SIZES); - _SPI_current->execCxt = AllocSetContextCreate(TopTransactionContext, + _SPI_current->execCxt = AllocSetContextCreate(_SPI_current->procCxt, "SPI Exec", ALLOCSET_DEFAULT_SIZES); /* ... and switch to procedure's context */ @@ -158,8 +158,6 @@ SPI_finish(void) MemoryContextSwitchTo(_SPI_current->savedcxt); /* Release memory used in procedure call (including tuptables) */ - MemoryContextDelete(_SPI_current->execCxt); - _SPI_current->execCxt = NULL; MemoryContextDelete(_SPI_current->procCxt); _SPI_current->procCxt = NULL; @@ -181,12 +179,58 @@ SPI_finish(void) return SPI_OK_FINISH; } +int +SPI_start_transaction(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + StartTransactionCommand(); + MemoryContextSwitchTo(oldcontext); + return 0; +} + + +int +SPI_commit(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + _SPI_current->internal_xact = true; + + if (ActiveSnapshotSet()) + PopActiveSnapshot(); + CommitTransactionCommand(); + MemoryContextSwitchTo(oldcontext); + + _SPI_current->internal_xact = false; + + return 0; +} + +int +SPI_rollback(void) +{ + MemoryContext oldcontext = CurrentMemoryContext; + + _SPI_current->internal_xact = true; + + AbortCurrentTransaction(); + MemoryContextSwitchTo(oldcontext); + + _SPI_current->internal_xact = false; + + return 0; +} + /* * Clean up SPI state at transaction commit or abort. */ void AtEOXact_SPI(bool isCommit) { + if (_SPI_current && _SPI_current->internal_xact) + return; + /* * Note that memory contexts belonging to SPI stack entries will be freed * automatically, so we can ignore them here. We just need to restore our @@ -224,21 +268,10 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid) if (connection->connectSubid != mySubid) break; /* couldn't be any underneath it either */ - found = true; + if (connection->internal_xact) + break; - /* - * Release procedure memory explicitly (see note in SPI_connect) - */ - if (connection->execCxt) - { - MemoryContextDelete(connection->execCxt); - connection->execCxt = NULL; - } - if (connection->procCxt) - { - MemoryContextDelete(connection->procCxt); - connection->procCxt = NULL; - } + found = true; /* * Pop the stack entry and reset global variables. Unlike diff --git a/src/backend/utils/mmgr/portalmem.c b/src/backend/utils/mmgr/portalmem.c index 89db08464f..d07dc060a5 100644 --- a/src/backend/utils/mmgr/portalmem.c +++ b/src/backend/utils/mmgr/portalmem.c @@ -736,11 +736,8 @@ PreCommit_Portals(bool isPrepare) /* * Abort processing for portals. * - * At this point we reset "active" status and run the cleanup hook if - * present, but we can't release the portal's memory until the cleanup call. - * - * The reason we need to reset active is so that we can replace the unnamed - * portal, else we'll fail to execute ROLLBACK when it arrives. + * At this point we run the cleanup hook if present, but we can't release the + * portal's memory until the cleanup call. */ void AtAbort_Portals(void) @@ -754,17 +751,6 @@ AtAbort_Portals(void) { Portal portal = hentry->portal; - /* - * See similar code in AtSubAbort_Portals(). This would fire if code - * orchestrating multiple top-level transactions within a portal, such - * as VACUUM, caught errors and continued under the same portal with a - * fresh transaction. No part of core PostgreSQL functions that way. - * XXX Such code would wish the portal to remain ACTIVE, as in - * PreCommit_Portals(). - */ - if (portal->status == PORTAL_ACTIVE) - MarkPortalFailed(portal); - /* * Do nothing else to cursors held over from a previous transaction. */ @@ -799,14 +785,6 @@ AtAbort_Portals(void) * PortalDrop. */ portal->resowner = NULL; - - /* - * Although we can't delete the portal data structure proper, we can - * release any memory in subsidiary contexts, such as executor state. - * The cleanup hook was the last thing that might have needed data - * there. - */ - MemoryContextDeleteChildren(PortalGetHeapMemory(portal)); } } @@ -826,6 +804,19 @@ AtCleanup_Portals(void) { Portal portal = hentry->portal; + /* + * Do not touch active portals --- this can only happen in the case of + * a multi-transaction command. + * + * Note however that any resource owner attached to such a portal is + * still going to go away, so don't leave a dangling pointer. + */ + if (portal->status == PORTAL_ACTIVE) + { + portal->resowner = NULL; + continue; + } + /* Do nothing to cursors held over from a previous transaction */ if (portal->createSubid == InvalidSubTransactionId) { diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h index acade7e92e..1bf73d8594 100644 --- a/src/include/executor/spi.h +++ b/src/include/executor/spi.h @@ -156,6 +156,10 @@ extern int SPI_register_relation(EphemeralNamedRelation enr); extern int SPI_unregister_relation(const char *name); extern int SPI_register_trigger_data(TriggerData *tdata); +extern int SPI_start_transaction(void); +extern int SPI_commit(void); +extern int SPI_rollback(void); + extern void AtEOXact_SPI(bool isCommit); extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid); diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h index 8fae755418..c12a465712 100644 --- a/src/include/executor/spi_priv.h +++ b/src/include/executor/spi_priv.h @@ -36,6 +36,7 @@ typedef struct MemoryContext savedcxt; /* context of SPI_connect's caller */ SubTransactionId connectSubid; /* ID of connecting subtransaction */ QueryEnvironment *queryEnv; /* query environment setup for SPI level */ + bool internal_xact; /* SPI-managed transaction boundary, skip cleanup */ } _SPI_connection; /* diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile index cc91afebde..d09910835d 100644 --- a/src/pl/plpython/Makefile +++ b/src/pl/plpython/Makefile @@ -90,6 +90,7 @@ REGRESS = \ plpython_quote \ plpython_composite \ plpython_subtransaction \ + plpython_transaction \ plpython_drop REGRESS_PLPYTHON3_MANGLE := $(REGRESS) diff --git a/src/pl/plpython/expected/plpython_test.out b/src/pl/plpython/expected/plpython_test.out index 847e4cc412..f0a10cc05f 100644 --- a/src/pl/plpython/expected/plpython_test.out +++ b/src/pl/plpython/expected/plpython_test.out @@ -43,11 +43,12 @@ contents.sort() return contents $$ LANGUAGE plpythonu; select module_contents(); - module_contents ------------------ + module_contents +------------------- Error Fatal SPIError + commit cursor debug error @@ -60,10 +61,12 @@ select module_contents(); quote_ident quote_literal quote_nullable + rollback spiexceptions + start_transaction subtransaction warning -(18 rows) +(21 rows) CREATE FUNCTION elog_test_basic() RETURNS void AS $$ diff --git a/src/pl/plpython/expected/plpython_transaction.out b/src/pl/plpython/expected/plpython_transaction.out new file mode 100644 index 0000000000..64e3abdd60 --- /dev/null +++ b/src/pl/plpython/expected/plpython_transaction.out @@ -0,0 +1,44 @@ +CREATE TABLE test1 (a int, b text); +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +CALL transaction_test1(); +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +TRUNCATE test1; +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; +SELECT * FROM test1; + a | b +---+--- + 0 | + 2 | + 4 | + 6 | + 8 | +(5 rows) + +DROP TABLE test1; diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c index 7df50c09c8..0d3b4a8fbc 100644 --- a/src/pl/plpython/plpy_main.c +++ b/src/pl/plpython/plpy_main.c @@ -424,7 +424,7 @@ PLy_push_execution_context(void) PLyExecutionContext *context; context = (PLyExecutionContext *) - MemoryContextAlloc(TopTransactionContext, sizeof(PLyExecutionContext)); + MemoryContextAlloc(PortalContext, sizeof(PLyExecutionContext)); context->curr_proc = NULL; context->scratch_ctx = NULL; context->next = PLy_execution_contexts; diff --git a/src/pl/plpython/plpy_plpymodule.c b/src/pl/plpython/plpy_plpymodule.c index 759ad44932..a95e66bfee 100644 --- a/src/pl/plpython/plpy_plpymodule.c +++ b/src/pl/plpython/plpy_plpymodule.c @@ -6,8 +6,10 @@ #include "postgres.h" +#include "access/xact.h" #include "mb/pg_wchar.h" #include "utils/builtins.h" +#include "utils/snapmgr.h" #include "plpython.h" @@ -41,6 +43,9 @@ static PyObject *PLy_fatal(PyObject *self, PyObject *args, PyObject *kw); static PyObject *PLy_quote_literal(PyObject *self, PyObject *args); static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args); static PyObject *PLy_quote_ident(PyObject *self, PyObject *args); +static PyObject *PLy_start_transaction(PyObject *self, PyObject *args); +static PyObject *PLy_commit(PyObject *self, PyObject *args); +static PyObject *PLy_rollback(PyObject *self, PyObject *args); /* A list of all known exceptions, generated from backend/utils/errcodes.txt */ @@ -95,6 +100,13 @@ static PyMethodDef PLy_methods[] = { */ {"cursor", PLy_cursor, METH_VARARGS, NULL}, + /* + * transaction control + */ + {"start_transaction", PLy_start_transaction, METH_NOARGS, NULL}, + {"commit", PLy_commit, METH_NOARGS, NULL}, + {"rollback", PLy_rollback, METH_NOARGS, NULL}, + {NULL, NULL, 0, NULL} }; @@ -577,3 +589,29 @@ PLy_output(volatile int level, PyObject *self, PyObject *args, PyObject *kw) */ Py_RETURN_NONE; } + +static PyObject * +PLy_start_transaction(PyObject *self, PyObject *args) +{ + SPI_start_transaction(); + + Py_RETURN_NONE; +} + +static PyObject * +PLy_commit(PyObject *self, PyObject *args) +{ + SPI_commit(); + SPI_start_transaction(); + + Py_RETURN_NONE; +} + +static PyObject * +PLy_rollback(PyObject *self, PyObject *args) +{ + SPI_rollback(); + SPI_start_transaction(); + + Py_RETURN_NONE; +} diff --git a/src/pl/plpython/sql/plpython_transaction.sql b/src/pl/plpython/sql/plpython_transaction.sql new file mode 100644 index 0000000000..42f191b008 --- /dev/null +++ b/src/pl/plpython/sql/plpython_transaction.sql @@ -0,0 +1,36 @@ +CREATE TABLE test1 (a int, b text); + + +CREATE PROCEDURE transaction_test1() +LANGUAGE plpythonu +AS $$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +CALL transaction_test1(); + +SELECT * FROM test1; + + +TRUNCATE test1; + +DO +LANGUAGE plpythonu +$$ +for i in range(0, 10): + plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i) + if i % 2 == 0: + plpy.commit() + else: + plpy.rollback() +$$; + +SELECT * FROM test1; + + +DROP TABLE test1; base-commit: ee4673ac071f8352c41cc673299b7ec695f079ff prerequisite-patch-id: 7b37b5c8905dcab64b9c6b8f98caf81049a569ec -- 2.14.3
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers