Here is a patch that implements transaction control in PL/Python
procedures.  (This patch goes on top of "SQL procedures" patch v1.)

So you can do this:

CREATE PROCEDURE transaction_test1()
LANGUAGE plpythonu
AS $$
for i in range(0, 10):
    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
    if i % 2 == 0:
         plpy.commit()
    else:
         plpy.rollback()
$$;

CALL transaction_test1();

I started with PL/Python because the internal structures there are more
manageable.  Obviously, people will want this for PL/pgSQL as well, and
I have that in the works.  It's not in a usable state, but I have found
that the work needed is essentially the same as in PL/Python for example.

I have discovered three groups of obstacles that needed addressing to
make this work.  At this point, the patch is more of a demo of what
these issues are, and if we come to satisfactory solutions for each of
them, things should fall into place more easily afterwards.

1) While calling CommitTransactionCommand() in the middle of a utility
command works just fine (several utility commands do this, of course),
calling AbortCurrentTransaction() in a similar way does not.  There are
a few pieces of code that think that a transaction abort will always
result in a return to the main control loop, and so they will just clean
away everything.  This is what the changes in portalmem.c are about.
Some comments there already hint about the issue.  No doubt this will
need further refinement.  I think it would be desirable in general to
separate the control flow concerns from the transaction management
concerns more cleanly.

2) SPI needs some work.  It thinks that it can clean everything away at
transaction end.  I have found that instead of TopTransactionContext one
can use PortalContext and get a more suitable life cycle for the memory.
 I have played with some variants to make this configurable (e.g.,
argument to SPI_connect()), but that didn't seem very useful.  There are
some comments indicating that there might not always be a PortalContext,
but the existing tests don't seem to mind.  (There was a thread recently
about making a fake PortalContext for autovacuum, so maybe the current
idea is that we make sure there always is a PortalContext.)  Maybe we
need another context like StatementContext or ProcedureContext.

There also needs to be a way via SPI to end transactions and allowing
*some* cleanup to happen but leaving the memory around.  I have done
that via additional SPI API functions like SPI_commit(), which are then
available to PL implementations.  I also tried making it possible
calling transaction statements directly via SPI_exec() or similar, but
that ended up a total disaster.  So from the API perspective, I like the
current implementation, but the details will no doubt need refinement.

3) The PL implementations themselves allocate memory in
transaction-bound contexts for convenience as well.  This is usually
easy to fix by switching to PortalContext as well.  As you see, the
PL/Python code part of the patch is actually very small.  Changes in
other PLs would be similar.

Two issues have not been addressed yet:

A) It would be desirable to be able to run commands such as VACUUM and
CREATE INDEX CONCURRENTLY in a procedure.  This needs a bit of thinking
and standards-lawyering about the semantics, like where exactly do
transactions begin and end in various combinations.  It will probably
also need a different entry point into SPI, because SPI_exec cannot
handle statements ending transactions.  But so far my assessment is that
this can be added in a mostly independent way later on.

B) There needs to be some kind of call stack for procedure and function
invocations, so that we can track when we are allowed to make
transaction controlling calls.  The key term in the SQL standard is
"non-atomic execution context", which seems specifically devised to
cover this scenario.  So for example, if you have CALL -> CALL -> CALL,
the third call can issue a transaction statement.  But if you have CALL
-> SELECT -> CALL, then the last call cannot, because the SELECT
introduces an atomic execution context.  I don't know if we have such a
thing yet or something that we could easily latch on to.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 1b3318154540d0fe71480ca58938433ecfccabbd Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Tue, 31 Oct 2017 14:48:51 -0400
Subject: [PATCH v1] Transaction control in PL/Python procedures

Add .commit, .rollback, and .start_transaction functions to plpy module
to control transactions in a PL/Python function.  Add similar underlying
functions to SPI.  Some additional cleanup so that transaction commit or
abort doesn't blow away data structures still used by the procedure
call.
---
 src/backend/commands/functioncmds.c               |  3 +
 src/backend/executor/spi.c                        | 71 +++++++++++++++++------
 src/backend/utils/mmgr/portalmem.c                | 39 +++++--------
 src/include/executor/spi.h                        |  4 ++
 src/include/executor/spi_priv.h                   |  1 +
 src/pl/plpython/Makefile                          |  1 +
 src/pl/plpython/expected/plpython_test.out        |  9 ++-
 src/pl/plpython/expected/plpython_transaction.out | 44 ++++++++++++++
 src/pl/plpython/plpy_main.c                       |  2 +-
 src/pl/plpython/plpy_plpymodule.c                 | 38 ++++++++++++
 src/pl/plpython/sql/plpython_transaction.sql      | 36 ++++++++++++
 11 files changed, 201 insertions(+), 47 deletions(-)
 create mode 100644 src/pl/plpython/expected/plpython_transaction.out
 create mode 100644 src/pl/plpython/sql/plpython_transaction.sql

diff --git a/src/backend/commands/functioncmds.c 
b/src/backend/commands/functioncmds.c
index 1f3156d870..badef78fdf 100644
--- a/src/backend/commands/functioncmds.c
+++ b/src/backend/commands/functioncmds.c
@@ -65,6 +65,7 @@
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/tqual.h"
@@ -2255,6 +2256,8 @@ ExecuteCallStmt(ParseState *pstate, CallStmt *stmt)
                                                           FUNC_MAX_ARGS,
                                                           FUNC_MAX_ARGS)));
 
+       MemoryContextSwitchTo(PortalContext);
+
        fmgr_info(fexpr->funcid, &flinfo);
        InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, 
NULL, NULL);
 
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 40292b86c1..f097f15731 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -92,7 +92,7 @@ SPI_connect(void)
                        elog(ERROR, "SPI stack corrupted");
                newdepth = 16;
                _SPI_stack = (_SPI_connection *)
-                       MemoryContextAlloc(TopTransactionContext,
+                       MemoryContextAlloc(TopMemoryContext,
                                                           newdepth * 
sizeof(_SPI_connection));
                _SPI_stack_depth = newdepth;
        }
@@ -133,10 +133,10 @@ SPI_connect(void)
         * Perhaps CurTransactionContext would do?      For now it doesn't 
matter
         * because we clean up explicitly in AtEOSubXact_SPI().
         */
-       _SPI_current->procCxt = AllocSetContextCreate(TopTransactionContext,
+       _SPI_current->procCxt = AllocSetContextCreate(PortalContext,
                                                                                
                  "SPI Proc",
                                                                                
                  ALLOCSET_DEFAULT_SIZES);
-       _SPI_current->execCxt = AllocSetContextCreate(TopTransactionContext,
+       _SPI_current->execCxt = AllocSetContextCreate(_SPI_current->procCxt,
                                                                                
                  "SPI Exec",
                                                                                
                  ALLOCSET_DEFAULT_SIZES);
        /* ... and switch to procedure's context */
@@ -158,8 +158,6 @@ SPI_finish(void)
        MemoryContextSwitchTo(_SPI_current->savedcxt);
 
        /* Release memory used in procedure call (including tuptables) */
-       MemoryContextDelete(_SPI_current->execCxt);
-       _SPI_current->execCxt = NULL;
        MemoryContextDelete(_SPI_current->procCxt);
        _SPI_current->procCxt = NULL;
 
@@ -181,12 +179,58 @@ SPI_finish(void)
        return SPI_OK_FINISH;
 }
 
+int
+SPI_start_transaction(void)
+{
+       MemoryContext oldcontext = CurrentMemoryContext;
+
+       StartTransactionCommand();
+       MemoryContextSwitchTo(oldcontext);
+       return 0;
+}
+
+
+int
+SPI_commit(void)
+{
+       MemoryContext oldcontext = CurrentMemoryContext;
+
+       _SPI_current->internal_xact = true;
+
+       if (ActiveSnapshotSet())
+               PopActiveSnapshot();
+       CommitTransactionCommand();
+       MemoryContextSwitchTo(oldcontext);
+
+       _SPI_current->internal_xact = false;
+
+       return 0;
+}
+
+int
+SPI_rollback(void)
+{
+       MemoryContext oldcontext = CurrentMemoryContext;
+
+       _SPI_current->internal_xact = true;
+
+       AbortCurrentTransaction();
+       MemoryContextSwitchTo(oldcontext);
+
+       _SPI_current->internal_xact = false;
+
+       return 0;
+}
+
 /*
  * Clean up SPI state at transaction commit or abort.
  */
 void
 AtEOXact_SPI(bool isCommit)
 {
+       if (_SPI_current && _SPI_current->internal_xact)
+               return;
+
        /*
         * Note that memory contexts belonging to SPI stack entries will be 
freed
         * automatically, so we can ignore them here.  We just need to restore 
our
@@ -224,21 +268,10 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
                if (connection->connectSubid != mySubid)
                        break;                          /* couldn't be any 
underneath it either */
 
-               found = true;
+               if (connection->internal_xact)
+                       break;
 
-               /*
-                * Release procedure memory explicitly (see note in SPI_connect)
-                */
-               if (connection->execCxt)
-               {
-                       MemoryContextDelete(connection->execCxt);
-                       connection->execCxt = NULL;
-               }
-               if (connection->procCxt)
-               {
-                       MemoryContextDelete(connection->procCxt);
-                       connection->procCxt = NULL;
-               }
+               found = true;
 
                /*
                 * Pop the stack entry and reset global variables.  Unlike
diff --git a/src/backend/utils/mmgr/portalmem.c 
b/src/backend/utils/mmgr/portalmem.c
index 89db08464f..d07dc060a5 100644
--- a/src/backend/utils/mmgr/portalmem.c
+++ b/src/backend/utils/mmgr/portalmem.c
@@ -736,11 +736,8 @@ PreCommit_Portals(bool isPrepare)
 /*
  * Abort processing for portals.
  *
- * At this point we reset "active" status and run the cleanup hook if
- * present, but we can't release the portal's memory until the cleanup call.
- *
- * The reason we need to reset active is so that we can replace the unnamed
- * portal, else we'll fail to execute ROLLBACK when it arrives.
+ * At this point we run the cleanup hook if present, but we can't release the
+ * portal's memory until the cleanup call.
  */
 void
 AtAbort_Portals(void)
@@ -754,17 +751,6 @@ AtAbort_Portals(void)
        {
                Portal          portal = hentry->portal;
 
-               /*
-                * See similar code in AtSubAbort_Portals().  This would fire 
if code
-                * orchestrating multiple top-level transactions within a 
portal, such
-                * as VACUUM, caught errors and continued under the same portal 
with a
-                * fresh transaction.  No part of core PostgreSQL functions 
that way.
-                * XXX Such code would wish the portal to remain ACTIVE, as in
-                * PreCommit_Portals().
-                */
-               if (portal->status == PORTAL_ACTIVE)
-                       MarkPortalFailed(portal);
-
                /*
                 * Do nothing else to cursors held over from a previous 
transaction.
                 */
@@ -799,14 +785,6 @@ AtAbort_Portals(void)
                 * PortalDrop.
                 */
                portal->resowner = NULL;
-
-               /*
-                * Although we can't delete the portal data structure proper, 
we can
-                * release any memory in subsidiary contexts, such as executor 
state.
-                * The cleanup hook was the last thing that might have needed 
data
-                * there.
-                */
-               MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
        }
 }
 
@@ -826,6 +804,19 @@ AtCleanup_Portals(void)
        {
                Portal          portal = hentry->portal;
 
+               /*
+                * Do not touch active portals --- this can only happen in the 
case of
+                * a multi-transaction command.
+                *
+                * Note however that any resource owner attached to such a 
portal is
+                * still going to go away, so don't leave a dangling pointer.
+                */
+               if (portal->status == PORTAL_ACTIVE)
+               {
+                       portal->resowner = NULL;
+                       continue;
+               }
+
                /* Do nothing to cursors held over from a previous transaction 
*/
                if (portal->createSubid == InvalidSubTransactionId)
                {
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index acade7e92e..1bf73d8594 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -156,6 +156,10 @@ extern int SPI_register_relation(EphemeralNamedRelation 
enr);
 extern int     SPI_unregister_relation(const char *name);
 extern int     SPI_register_trigger_data(TriggerData *tdata);
 
+extern int     SPI_start_transaction(void);
+extern int     SPI_commit(void);
+extern int     SPI_rollback(void);
+
 extern void AtEOXact_SPI(bool isCommit);
 extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid);
 
diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h
index 8fae755418..c12a465712 100644
--- a/src/include/executor/spi_priv.h
+++ b/src/include/executor/spi_priv.h
@@ -36,6 +36,7 @@ typedef struct
        MemoryContext savedcxt;         /* context of SPI_connect's caller */
        SubTransactionId connectSubid;  /* ID of connecting subtransaction */
        QueryEnvironment *queryEnv; /* query environment setup for SPI level */
+       bool            internal_xact;  /* SPI-managed transaction boundary, 
skip cleanup */
 } _SPI_connection;
 
 /*
diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile
index cc91afebde..d09910835d 100644
--- a/src/pl/plpython/Makefile
+++ b/src/pl/plpython/Makefile
@@ -90,6 +90,7 @@ REGRESS = \
        plpython_quote \
        plpython_composite \
        plpython_subtransaction \
+       plpython_transaction \
        plpython_drop
 
 REGRESS_PLPYTHON3_MANGLE := $(REGRESS)
diff --git a/src/pl/plpython/expected/plpython_test.out 
b/src/pl/plpython/expected/plpython_test.out
index 847e4cc412..f0a10cc05f 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -43,11 +43,12 @@ contents.sort()
 return contents
 $$ LANGUAGE plpythonu;
 select module_contents();
- module_contents 
------------------
+  module_contents  
+-------------------
  Error
  Fatal
  SPIError
+ commit
  cursor
  debug
  error
@@ -60,10 +61,12 @@ select module_contents();
  quote_ident
  quote_literal
  quote_nullable
+ rollback
  spiexceptions
+ start_transaction
  subtransaction
  warning
-(18 rows)
+(21 rows)
 
 CREATE FUNCTION elog_test_basic() RETURNS void
 AS $$
diff --git a/src/pl/plpython/expected/plpython_transaction.out 
b/src/pl/plpython/expected/plpython_transaction.out
new file mode 100644
index 0000000000..64e3abdd60
--- /dev/null
+++ b/src/pl/plpython/expected/plpython_transaction.out
@@ -0,0 +1,44 @@
+CREATE TABLE test1 (a int, b text);
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+         plpy.commit()
+    else:
+         plpy.rollback()
+$$;
+CALL transaction_test1();
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+TRUNCATE test1;
+DO
+LANGUAGE plpythonu
+$$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+         plpy.commit()
+    else:
+         plpy.rollback()
+$$;
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+DROP TABLE test1;
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 7df50c09c8..0d3b4a8fbc 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -424,7 +424,7 @@ PLy_push_execution_context(void)
        PLyExecutionContext *context;
 
        context = (PLyExecutionContext *)
-               MemoryContextAlloc(TopTransactionContext, 
sizeof(PLyExecutionContext));
+               MemoryContextAlloc(PortalContext, sizeof(PLyExecutionContext));
        context->curr_proc = NULL;
        context->scratch_ctx = NULL;
        context->next = PLy_execution_contexts;
diff --git a/src/pl/plpython/plpy_plpymodule.c 
b/src/pl/plpython/plpy_plpymodule.c
index 759ad44932..a95e66bfee 100644
--- a/src/pl/plpython/plpy_plpymodule.c
+++ b/src/pl/plpython/plpy_plpymodule.c
@@ -6,8 +6,10 @@
 
 #include "postgres.h"
 
+#include "access/xact.h"
 #include "mb/pg_wchar.h"
 #include "utils/builtins.h"
+#include "utils/snapmgr.h"
 
 #include "plpython.h"
 
@@ -41,6 +43,9 @@ static PyObject *PLy_fatal(PyObject *self, PyObject *args, 
PyObject *kw);
 static PyObject *PLy_quote_literal(PyObject *self, PyObject *args);
 static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args);
 static PyObject *PLy_quote_ident(PyObject *self, PyObject *args);
+static PyObject *PLy_start_transaction(PyObject *self, PyObject *args);
+static PyObject *PLy_commit(PyObject *self, PyObject *args);
+static PyObject *PLy_rollback(PyObject *self, PyObject *args);
 
 
 /* A list of all known exceptions, generated from backend/utils/errcodes.txt */
@@ -95,6 +100,13 @@ static PyMethodDef PLy_methods[] = {
         */
        {"cursor", PLy_cursor, METH_VARARGS, NULL},
 
+       /*
+        * transaction control
+        */
+       {"start_transaction", PLy_start_transaction, METH_NOARGS, NULL},
+       {"commit", PLy_commit, METH_NOARGS, NULL},
+       {"rollback", PLy_rollback, METH_NOARGS, NULL},
+
        {NULL, NULL, 0, NULL}
 };
 
@@ -577,3 +589,29 @@ PLy_output(volatile int level, PyObject *self, PyObject 
*args, PyObject *kw)
         */
        Py_RETURN_NONE;
 }
+
+static PyObject *
+PLy_start_transaction(PyObject *self, PyObject *args)
+{
+       SPI_start_transaction();
+
+       Py_RETURN_NONE;
+}
+
+static PyObject *
+PLy_commit(PyObject *self, PyObject *args)
+{
+       SPI_commit();
+       SPI_start_transaction();
+
+       Py_RETURN_NONE;
+}
+
+static PyObject *
+PLy_rollback(PyObject *self, PyObject *args)
+{
+       SPI_rollback();
+       SPI_start_transaction();
+
+       Py_RETURN_NONE;
+}
diff --git a/src/pl/plpython/sql/plpython_transaction.sql 
b/src/pl/plpython/sql/plpython_transaction.sql
new file mode 100644
index 0000000000..42f191b008
--- /dev/null
+++ b/src/pl/plpython/sql/plpython_transaction.sql
@@ -0,0 +1,36 @@
+CREATE TABLE test1 (a int, b text);
+
+
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+         plpy.commit()
+    else:
+         plpy.rollback()
+$$;
+
+CALL transaction_test1();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+DO
+LANGUAGE plpythonu
+$$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+         plpy.commit()
+    else:
+         plpy.rollback()
+$$;
+
+SELECT * FROM test1;
+
+
+DROP TABLE test1;

base-commit: ee4673ac071f8352c41cc673299b7ec695f079ff
prerequisite-patch-id: 7b37b5c8905dcab64b9c6b8f98caf81049a569ec
-- 
2.14.3

-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to