On 10/31/17 15:38, Peter Eisentraut wrote:
> Here is a patch that implements transaction control in PL/Python
> procedures.  (This patch goes on top of "SQL procedures" patch v1.)

Here is an updated patch, now on top of "SQL procedures" v2.

Relative to the previous patch v1 I added transaction control to
PL/pgSQL, PL/Perl, and PL/Tcl with relative ease.  (There is a weird
crash in one PL/Perl test that is currently commented out.  And I can't
get a proper backtrace.  Maybe something to do with recursive Perl
interpreters?)

I crash-coursed myself in PL/Perl and PL/Tcl (and Tcl).  If anyone has
more of a feel for those languages and wants to comment on the proposed
interfaces and internals, please chime in.

I also added tracking so that transaction control commands can only be
made in the proper context, currently meaning only top-level procedure
calls, not functions or other procedure calls.  This should be extended
to also allow nested CALLs without anything in between, but I need more
time to code that.

I'll spend a bit more time on tidying up a few things, and a bunch of
documentation is missing, but I currently don't see any more major
issues here.

-- 
Peter Eisentraut              http://www.2ndQuadrant.com/
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services
From 522c8f20ddcb3f670b0a5687bdc4c7b18d5eaeae Mon Sep 17 00:00:00 2001
From: Peter Eisentraut <pete...@gmx.net>
Date: Tue, 14 Nov 2017 18:00:26 -0500
Subject: [PATCH v2] Transaction control in PL procedures

In each of the supplied procedural languages (PL/pgSQL, PL/Perl,
PL/Python, PL/Tcl), add language-specific commit and rollback
functions/commands to control transactions in procedures in that
language.  Add similar underlying functions to SPI.  Some additional
cleanup so that transaction commit or abort doesn't blow away data
structures still used by the procedure call.  Add execution context
tracking to CALL statement so that transaction control commands can only
be issued in top-level procedure calls, not function calls or other
procedure calls.
---
 src/backend/commands/functioncmds.c               | 29 ++++++-
 src/backend/executor/spi.c                        | 93 +++++++++++++++++-----
 src/backend/tcop/utility.c                        |  2 +-
 src/backend/utils/mmgr/portalmem.c                | 39 ++++-----
 src/include/commands/defrem.h                     | 12 ++-
 src/include/executor/spi.h                        |  5 ++
 src/include/executor/spi_priv.h                   |  4 +
 src/include/nodes/nodes.h                         |  3 +-
 src/pl/plperl/GNUmakefile                         |  2 +-
 src/pl/plperl/SPI.xs                              | 12 +++
 src/pl/plperl/expected/plperl_transaction.out     | 85 ++++++++++++++++++++
 src/pl/plperl/plperl.c                            |  6 ++
 src/pl/plperl/sql/plperl_transaction.sql          | 76 ++++++++++++++++++
 src/pl/plpgsql/src/pl_exec.c                      | 48 +++++++++++-
 src/pl/plpgsql/src/pl_funcs.c                     | 44 +++++++++++
 src/pl/plpgsql/src/pl_gram.y                      | 34 ++++++++
 src/pl/plpgsql/src/pl_handler.c                   |  8 ++
 src/pl/plpgsql/src/pl_scanner.c                   |  2 +
 src/pl/plpgsql/src/plpgsql.h                      | 22 +++++-
 src/pl/plpython/Makefile                          |  1 +
 src/pl/plpython/expected/plpython_test.out        |  9 ++-
 src/pl/plpython/expected/plpython_transaction.out | 83 ++++++++++++++++++++
 src/pl/plpython/plpy_main.c                       |  7 +-
 src/pl/plpython/plpy_plpymodule.c                 | 38 +++++++++
 src/pl/plpython/sql/plpython_transaction.sql      | 69 ++++++++++++++++
 src/pl/tcl/Makefile                               |  2 +-
 src/pl/tcl/expected/pltcl_transaction.out         | 63 +++++++++++++++
 src/pl/tcl/pltcl.c                                | 45 +++++++++++
 src/pl/tcl/sql/pltcl_transaction.sql              | 60 ++++++++++++++
 src/test/regress/expected/plpgsql.out             | 96 +++++++++++++++++++++++
 src/test/regress/sql/plpgsql.sql                  | 82 +++++++++++++++++++
 31 files changed, 1025 insertions(+), 56 deletions(-)
 create mode 100644 src/pl/plperl/expected/plperl_transaction.out
 create mode 100644 src/pl/plperl/sql/plperl_transaction.sql
 create mode 100644 src/pl/plpython/expected/plpython_transaction.out
 create mode 100644 src/pl/plpython/sql/plpython_transaction.sql
 create mode 100644 src/pl/tcl/expected/pltcl_transaction.out
 create mode 100644 src/pl/tcl/sql/pltcl_transaction.sql

diff --git a/src/backend/commands/functioncmds.c 
b/src/backend/commands/functioncmds.c
index 1f3156d870..263f3a69c4 100644
--- a/src/backend/commands/functioncmds.c
+++ b/src/backend/commands/functioncmds.c
@@ -65,6 +65,7 @@
 #include "utils/fmgroids.h"
 #include "utils/guc.h"
 #include "utils/lsyscache.h"
+#include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/syscache.h"
 #include "utils/tqual.h"
@@ -2207,9 +2208,28 @@ ExecuteDoStmt(DoStmt *stmt)
 
 /*
  * Execute CALL statement
+ *
+ * Inside a top-level CALL statement, transaction-terminating commands such as
+ * COMMIT or a PL-specific equivalent are allowed.  The terminology in the SQL
+ * standard is that CALL establishes a non-atomic execution context.  Most
+ * other commands establish an atomic execution context, in which transaction
+ * control actions are not allowed.  If there are nested executions of CALL,
+ * we want to track the execution context recursively, so that the nested
+ * CALLs can also do transaction control.  Note, however, that for example in
+ * CALL -> SELECT -> CALL, the second call cannot do transaction control,
+ * because the SELECT in between establishes an atomic execution context.
+ *
+ * So when ExecuteCallStmt() is called from the top level, we pass in atomic =
+ * false (recall that that means transactions = yes).  We then create a
+ * CallContext node with content atomic = false, which is passed in the
+ * fcinfo->context field to the procedure invocation.  The language
+ * implementation should then take appropriate measures to allow or prevent
+ * transaction commands based on that information, e.g., call
+ * SPI_set_nonatomic().  The language should also pass on the atomic flag to
+ * any recursive invocations to CALL.
  */
 void
-ExecuteCallStmt(ParseState *pstate, CallStmt *stmt)
+ExecuteCallStmt(ParseState *pstate, CallStmt *stmt, bool atomic)
 {
        List       *targs;
        ListCell   *lc;
@@ -2220,6 +2240,7 @@ ExecuteCallStmt(ParseState *pstate, CallStmt *stmt)
        AclResult   aclresult;
        FmgrInfo        flinfo;
        FunctionCallInfoData fcinfo;
+       CallContext *callcontext;
 
        targs = NIL;
        foreach(lc, stmt->funccall->args)
@@ -2255,8 +2276,12 @@ ExecuteCallStmt(ParseState *pstate, CallStmt *stmt)
                                                           FUNC_MAX_ARGS,
                                                           FUNC_MAX_ARGS)));
 
+       MemoryContextSwitchTo(PortalContext);
+
+       callcontext = makeNode(CallContext);
+       callcontext->atomic = atomic;
        fmgr_info(fexpr->funcid, &flinfo);
-       InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, 
NULL, NULL);
+       InitFunctionCallInfoData(fcinfo, &flinfo, nargs, fexpr->inputcollid, 
(Node *) callcontext, NULL);
 
        i = 0;
        foreach (lc, fexpr->args)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 2da1cac3e2..7d47a90638 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -92,7 +92,7 @@ SPI_connect(void)
                        elog(ERROR, "SPI stack corrupted");
                newdepth = 16;
                _SPI_stack = (_SPI_connection *)
-                       MemoryContextAlloc(TopTransactionContext,
+                       MemoryContextAlloc(TopMemoryContext,
                                                           newdepth * 
sizeof(_SPI_connection));
                _SPI_stack_depth = newdepth;
        }
@@ -124,6 +124,7 @@ SPI_connect(void)
        _SPI_current->execCxt = NULL;
        _SPI_current->connectSubid = GetCurrentSubTransactionId();
        _SPI_current->queryEnv = NULL;
+       _SPI_current->atomic = true;    /* until told otherwise */
 
        /*
         * Create memory contexts for this procedure
@@ -133,10 +134,10 @@ SPI_connect(void)
         * Perhaps CurTransactionContext would do?      For now it doesn't 
matter
         * because we clean up explicitly in AtEOSubXact_SPI().
         */
-       _SPI_current->procCxt = AllocSetContextCreate(TopTransactionContext,
+       _SPI_current->procCxt = AllocSetContextCreate(PortalContext,
                                                                                
                  "SPI Proc",
                                                                                
                  ALLOCSET_DEFAULT_SIZES);
-       _SPI_current->execCxt = AllocSetContextCreate(TopTransactionContext,
+       _SPI_current->execCxt = AllocSetContextCreate(_SPI_current->procCxt,
                                                                                
                  "SPI Exec",
                                                                                
                  ALLOCSET_DEFAULT_SIZES);
        /* ... and switch to procedure's context */
@@ -145,6 +146,17 @@ SPI_connect(void)
        return SPI_OK_CONNECT;
 }
 
+int
+SPI_set_nonatomic(void)
+{
+       if (_SPI_current == NULL)
+               return SPI_ERROR_UNCONNECTED;
+
+       _SPI_current->atomic = false;
+
+       return SPI_OK_CONNECT;
+}
+
 int
 SPI_finish(void)
 {
@@ -158,8 +170,6 @@ SPI_finish(void)
        MemoryContextSwitchTo(_SPI_current->savedcxt);
 
        /* Release memory used in procedure call (including tuptables) */
-       MemoryContextDelete(_SPI_current->execCxt);
-       _SPI_current->execCxt = NULL;
        MemoryContextDelete(_SPI_current->procCxt);
        _SPI_current->procCxt = NULL;
 
@@ -181,12 +191,68 @@ SPI_finish(void)
        return SPI_OK_FINISH;
 }
 
+int
+SPI_start_transaction(void)
+{
+       MemoryContext oldcontext = CurrentMemoryContext;
+
+       StartTransactionCommand();
+       MemoryContextSwitchTo(oldcontext);
+       return 0;
+}
+
+
+int
+SPI_commit(void)
+{
+       MemoryContext oldcontext = CurrentMemoryContext;
+
+       if (_SPI_current->atomic)
+               ereport(ERROR,
+                               
(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
+                                errmsg("invalid transaction termination")));
+
+       _SPI_current->internal_xact = true;
+
+       if (ActiveSnapshotSet())
+               PopActiveSnapshot();
+       CommitTransactionCommand();
+       MemoryContextSwitchTo(oldcontext);
+
+       _SPI_current->internal_xact = false;
+
+       return 0;
+}
+
+int
+SPI_rollback(void)
+{
+       MemoryContext oldcontext = CurrentMemoryContext;
+
+       if (_SPI_current->atomic)
+               ereport(ERROR,
+                               
(errcode(ERRCODE_INVALID_TRANSACTION_TERMINATION),
+                                errmsg("invalid transaction termination")));
+
+       _SPI_current->internal_xact = true;
+
+       AbortCurrentTransaction();
+       MemoryContextSwitchTo(oldcontext);
+
+       _SPI_current->internal_xact = false;
+
+       return 0;
+}
+
 /*
  * Clean up SPI state at transaction commit or abort.
  */
 void
 AtEOXact_SPI(bool isCommit)
 {
+       if (_SPI_current && _SPI_current->internal_xact)
+               return;
+
        /*
         * Note that memory contexts belonging to SPI stack entries will be 
freed
         * automatically, so we can ignore them here.  We just need to restore 
our
@@ -224,21 +290,10 @@ AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid)
                if (connection->connectSubid != mySubid)
                        break;                          /* couldn't be any 
underneath it either */
 
-               found = true;
+               if (connection->internal_xact)
+                       break;
 
-               /*
-                * Release procedure memory explicitly (see note in SPI_connect)
-                */
-               if (connection->execCxt)
-               {
-                       MemoryContextDelete(connection->execCxt);
-                       connection->execCxt = NULL;
-               }
-               if (connection->procCxt)
-               {
-                       MemoryContextDelete(connection->procCxt);
-                       connection->procCxt = NULL;
-               }
+               found = true;
 
                /*
                 * Pop the stack entry and reset global variables.  Unlike
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index 4da1f8f643..1e8de07e11 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -658,7 +658,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                        break;
 
                case T_CallStmt:
-                       ExecuteCallStmt(pstate, castNode(CallStmt, parsetree));
+                       ExecuteCallStmt(pstate, castNode(CallStmt, parsetree), 
context != PROCESS_UTILITY_TOPLEVEL);
                        break;
 
                case T_ClusterStmt:
diff --git a/src/backend/utils/mmgr/portalmem.c 
b/src/backend/utils/mmgr/portalmem.c
index d03b779407..1b217c312a 100644
--- a/src/backend/utils/mmgr/portalmem.c
+++ b/src/backend/utils/mmgr/portalmem.c
@@ -736,11 +736,8 @@ PreCommit_Portals(bool isPrepare)
 /*
  * Abort processing for portals.
  *
- * At this point we reset "active" status and run the cleanup hook if
- * present, but we can't release the portal's memory until the cleanup call.
- *
- * The reason we need to reset active is so that we can replace the unnamed
- * portal, else we'll fail to execute ROLLBACK when it arrives.
+ * At this point we run the cleanup hook if present, but we can't release the
+ * portal's memory until the cleanup call.
  */
 void
 AtAbort_Portals(void)
@@ -754,17 +751,6 @@ AtAbort_Portals(void)
        {
                Portal          portal = hentry->portal;
 
-               /*
-                * See similar code in AtSubAbort_Portals().  This would fire 
if code
-                * orchestrating multiple top-level transactions within a 
portal, such
-                * as VACUUM, caught errors and continued under the same portal 
with a
-                * fresh transaction.  No part of core PostgreSQL functions 
that way.
-                * XXX Such code would wish the portal to remain ACTIVE, as in
-                * PreCommit_Portals().
-                */
-               if (portal->status == PORTAL_ACTIVE)
-                       MarkPortalFailed(portal);
-
                /*
                 * Do nothing else to cursors held over from a previous 
transaction.
                 */
@@ -799,14 +785,6 @@ AtAbort_Portals(void)
                 * PortalDrop.
                 */
                portal->resowner = NULL;
-
-               /*
-                * Although we can't delete the portal data structure proper, 
we can
-                * release any memory in subsidiary contexts, such as executor 
state.
-                * The cleanup hook was the last thing that might have needed 
data
-                * there.
-                */
-               MemoryContextDeleteChildren(PortalGetHeapMemory(portal));
        }
 }
 
@@ -826,6 +804,19 @@ AtCleanup_Portals(void)
        {
                Portal          portal = hentry->portal;
 
+               /*
+                * Do not touch active portals --- this can only happen in the 
case of
+                * a multi-transaction command.
+                *
+                * Note however that any resource owner attached to such a 
portal is
+                * still going to go away, so don't leave a dangling pointer.
+                */
+               if (portal->status == PORTAL_ACTIVE)
+               {
+                       portal->resowner = NULL;
+                       continue;
+               }
+
                /* Do nothing to cursors held over from a previous transaction 
*/
                if (portal->createSubid == InvalidSubTransactionId)
                {
diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h
index 52cbf61ccb..8ebe3c48aa 100644
--- a/src/include/commands/defrem.h
+++ b/src/include/commands/defrem.h
@@ -59,7 +59,7 @@ extern void DropTransformById(Oid transformOid);
 extern void IsThereFunctionInNamespace(const char *proname, int pronargs,
                                                   oidvector *proargtypes, Oid 
nspOid);
 extern void ExecuteDoStmt(DoStmt *stmt);
-extern void ExecuteCallStmt(ParseState *pstate, CallStmt *stmt);
+extern void ExecuteCallStmt(ParseState *pstate, CallStmt *stmt, bool atomic);
 extern Oid     get_cast_oid(Oid sourcetypeid, Oid targettypeid, bool 
missing_ok);
 extern Oid     get_transform_oid(Oid type_id, Oid lang_id, bool missing_ok);
 extern void interpret_function_parameter_list(ParseState *pstate,
@@ -74,6 +74,16 @@ extern void interpret_function_parameter_list(ParseState 
*pstate,
                                                                  Oid 
*variadicArgType,
                                                                  Oid 
*requiredResultType);
 
+/*
+ * Procedure call context information
+ */
+typedef struct CallContext
+{
+       NodeTag         type;
+       bool            atomic;
+} CallContext;
+
+
 /* commands/operatorcmds.c */
 extern ObjectAddress DefineOperator(List *names, List *parameters);
 extern void RemoveOperatorById(Oid operOid);
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index acade7e92e..e38158c54e 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -78,6 +78,7 @@ extern PGDLLIMPORT SPITupleTable *SPI_tuptable;
 extern PGDLLIMPORT int SPI_result;
 
 extern int     SPI_connect(void);
+extern int     SPI_set_nonatomic(void);
 extern int     SPI_finish(void);
 extern int     SPI_execute(const char *src, bool read_only, long tcount);
 extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
@@ -156,6 +157,10 @@ extern int SPI_register_relation(EphemeralNamedRelation 
enr);
 extern int     SPI_unregister_relation(const char *name);
 extern int     SPI_register_trigger_data(TriggerData *tdata);
 
+extern int     SPI_start_transaction(void);
+extern int     SPI_commit(void);
+extern int     SPI_rollback(void);
+
 extern void AtEOXact_SPI(bool isCommit);
 extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid);
 
diff --git a/src/include/executor/spi_priv.h b/src/include/executor/spi_priv.h
index 8fae755418..9dc8af22e4 100644
--- a/src/include/executor/spi_priv.h
+++ b/src/include/executor/spi_priv.h
@@ -36,6 +36,10 @@ typedef struct
        MemoryContext savedcxt;         /* context of SPI_connect's caller */
        SubTransactionId connectSubid;  /* ID of connecting subtransaction */
        QueryEnvironment *queryEnv; /* query environment setup for SPI level */
+
+       /* transaction management support */
+       bool            atomic;                 /* atomic execution context, 
does not allow transactions */
+       bool            internal_xact;  /* SPI-managed transaction boundary, 
skip cleanup */
 } _SPI_connection;
 
 /*
diff --git a/src/include/nodes/nodes.h b/src/include/nodes/nodes.h
index 43ee88bd39..82dbf4e177 100644
--- a/src/include/nodes/nodes.h
+++ b/src/include/nodes/nodes.h
@@ -499,7 +499,8 @@ typedef enum NodeTag
        T_FdwRoutine,                           /* in foreign/fdwapi.h */
        T_IndexAmRoutine,                       /* in access/amapi.h */
        T_TsmRoutine,                           /* in access/tsmapi.h */
-       T_ForeignKeyCacheInfo           /* in utils/rel.h */
+       T_ForeignKeyCacheInfo,          /* in utils/rel.h */
+       T_CallContext                           /* in commands/defrem.h */
 } NodeTag;
 
 /*
diff --git a/src/pl/plperl/GNUmakefile b/src/pl/plperl/GNUmakefile
index b829027d05..933abb47c4 100644
--- a/src/pl/plperl/GNUmakefile
+++ b/src/pl/plperl/GNUmakefile
@@ -55,7 +55,7 @@ endif # win32
 SHLIB_LINK = $(perl_embed_ldflags)
 
 REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-extension=plperl  
--load-extension=plperlu
-REGRESS = plperl plperl_lc plperl_trigger plperl_shared plperl_elog 
plperl_util plperl_init plperlu plperl_array plperl_call
+REGRESS = plperl plperl_lc plperl_trigger plperl_shared plperl_elog 
plperl_util plperl_init plperlu plperl_array plperl_call plperl_transaction
 # if Perl can support two interpreters in one backend,
 # test plperl-and-plperlu cases
 ifneq ($(PERL),)
diff --git a/src/pl/plperl/SPI.xs b/src/pl/plperl/SPI.xs
index d9e6f579d4..dc9ddedc8f 100644
--- a/src/pl/plperl/SPI.xs
+++ b/src/pl/plperl/SPI.xs
@@ -17,6 +17,7 @@
 #define PG_NEED_PERL_XSUB_H
 #include "plperl.h"
 #include "plperl_helpers.h"
+#include "executor/spi.h"
 
 
 MODULE = PostgreSQL::InServer::SPI PREFIX = spi_
@@ -152,6 +153,17 @@ spi_spi_cursor_close(sv)
                plperl_spi_cursor_close(cursor);
                pfree(cursor);
 
+void
+spi_spi_commit()
+       CODE:
+               SPI_commit();
+               SPI_start_transaction();
+
+void
+spi_spi_rollback()
+       CODE:
+               SPI_rollback();
+               SPI_start_transaction();
 
 BOOT:
     items = 0;  /* avoid 'unused variable' warning */
diff --git a/src/pl/plperl/expected/plperl_transaction.out 
b/src/pl/plperl/expected/plperl_transaction.out
new file mode 100644
index 0000000000..38e9651d3e
--- /dev/null
+++ b/src/pl/plperl/expected/plperl_transaction.out
@@ -0,0 +1,85 @@
+CREATE TABLE test1 (a int, b text);
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plperl
+AS $$
+foreach my $i (0..9) {
+    spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+    if ($i % 2 == 0) {
+        spi_commit();
+    } else {
+        spi_rollback();
+    }
+}
+$$;
+CALL transaction_test1();
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+TRUNCATE test1;
+DO
+LANGUAGE plperl
+$$
+foreach my $i (0..9) {
+    spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+    if ($i % 2 == 0) {
+        spi_commit();
+    } else {
+        spi_rollback();
+    }
+}
+$$;
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+TRUNCATE test1;
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE plperl
+AS $$
+foreach my $i (0..9) {
+    spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+    if ($i % 2 == 0) {
+        spi_commit();
+    } else {
+        spi_rollback();
+    }
+}
+return 1;
+$$;
+SELECT transaction_test2();
+ERROR:  invalid transaction termination
+CONTEXT:  PL/Perl function "transaction_test2"
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE plperl
+AS $$
+spi_exec_query("CALL transaction_test1()");
+return 1;
+$$;
+-- FIXME
+--SELECT transaction_test3();
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+DROP TABLE test1;
diff --git a/src/pl/plperl/plperl.c b/src/pl/plperl/plperl.c
index 9f5313235f..f3133e816b 100644
--- a/src/pl/plperl/plperl.c
+++ b/src/pl/plperl/plperl.c
@@ -23,6 +23,7 @@
 #include "catalog/pg_proc.h"
 #include "catalog/pg_proc_fn.h"
 #include "catalog/pg_type.h"
+#include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
@@ -1931,6 +1932,7 @@ plperl_inline_handler(PG_FUNCTION_ARGS)
 
                if (SPI_connect() != SPI_OK_CONNECT)
                        elog(ERROR, "could not connect to SPI manager");
+               SPI_set_nonatomic();
 
                select_perl_context(desc.lanpltrusted);
 
@@ -2409,6 +2411,10 @@ plperl_func_handler(PG_FUNCTION_ARGS)
        current_call_data->prodesc = prodesc;
        increment_prodesc_refcount(prodesc);
 
+       if (prodesc->result_oid == InvalidOid &&
+               !castNode(CallContext, fcinfo->context)->atomic)
+               SPI_set_nonatomic();
+
        /* Set a callback for error reporting */
        pl_error_context.callback = plperl_exec_callback;
        pl_error_context.previous = error_context_stack;
diff --git a/src/pl/plperl/sql/plperl_transaction.sql 
b/src/pl/plperl/sql/plperl_transaction.sql
new file mode 100644
index 0000000000..864a3e396c
--- /dev/null
+++ b/src/pl/plperl/sql/plperl_transaction.sql
@@ -0,0 +1,76 @@
+CREATE TABLE test1 (a int, b text);
+
+
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plperl
+AS $$
+foreach my $i (0..9) {
+    spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+    if ($i % 2 == 0) {
+        spi_commit();
+    } else {
+        spi_rollback();
+    }
+}
+$$;
+
+CALL transaction_test1();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+DO
+LANGUAGE plperl
+$$
+foreach my $i (0..9) {
+    spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+    if ($i % 2 == 0) {
+        spi_commit();
+    } else {
+        spi_rollback();
+    }
+}
+$$;
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE plperl
+AS $$
+foreach my $i (0..9) {
+    spi_exec_query("INSERT INTO test1 (a) VALUES ($i)");
+    if ($i % 2 == 0) {
+        spi_commit();
+    } else {
+        spi_rollback();
+    }
+}
+return 1;
+$$;
+
+SELECT transaction_test2();
+
+SELECT * FROM test1;
+
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE plperl
+AS $$
+spi_exec_query("CALL transaction_test1()");
+return 1;
+$$;
+
+-- FIXME
+--SELECT transaction_test3();
+
+SELECT * FROM test1;
+
+
+DROP TABLE test1;
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index 882b16e2b1..2943618d62 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -215,6 +215,10 @@ static int exec_stmt_dynexecute(PLpgSQL_execstate *estate,
                                         PLpgSQL_stmt_dynexecute *stmt);
 static int exec_stmt_dynfors(PLpgSQL_execstate *estate,
                                  PLpgSQL_stmt_dynfors *stmt);
+static int exec_stmt_commit(PLpgSQL_execstate *estate,
+                               PLpgSQL_stmt_commit *stmt);
+static int exec_stmt_rollback(PLpgSQL_execstate *estate,
+                               PLpgSQL_stmt_rollback *stmt);
 
 static void plpgsql_estate_setup(PLpgSQL_execstate *estate,
                                         PLpgSQL_function *func,
@@ -1652,6 +1656,14 @@ exec_stmt(PLpgSQL_execstate *estate, PLpgSQL_stmt *stmt)
                        rc = exec_stmt_close(estate, (PLpgSQL_stmt_close *) 
stmt);
                        break;
 
+               case PLPGSQL_STMT_COMMIT:
+                       rc = exec_stmt_commit(estate, (PLpgSQL_stmt_commit *) 
stmt);
+                       break;
+
+               case PLPGSQL_STMT_ROLLBACK:
+                       rc = exec_stmt_rollback(estate, (PLpgSQL_stmt_rollback 
*) stmt);
+                       break;
+
                default:
                        estate->err_stmt = save_estmt;
                        elog(ERROR, "unrecognized cmdtype: %d", stmt->cmd_type);
@@ -4356,6 +4368,39 @@ exec_stmt_close(PLpgSQL_execstate *estate, 
PLpgSQL_stmt_close *stmt)
        return PLPGSQL_RC_OK;
 }
 
+/*
+ * exec_stmt_commit
+ *
+ * Commit the transaction.
+ */
+static int
+exec_stmt_commit(PLpgSQL_execstate *estate, PLpgSQL_stmt_commit *stmt)
+{
+       SPI_commit();
+       SPI_start_transaction();
+
+       estate->simple_eval_estate = NULL;
+       plpgsql_create_econtext(estate);
+
+       return PLPGSQL_RC_OK;
+}
+
+/*
+ * exec_stmt_rollback
+ *
+ * Abort the transaction.
+ */
+static int
+exec_stmt_rollback(PLpgSQL_execstate *estate, PLpgSQL_stmt_rollback *stmt)
+{
+       SPI_rollback();
+       SPI_start_transaction();
+
+       estate->simple_eval_estate = NULL;
+       plpgsql_create_econtext(estate);
+
+       return PLPGSQL_RC_OK;
+}
 
 /* ----------
  * exec_assign_expr                    Put an expression's result into a 
variable.
@@ -6858,7 +6903,8 @@ plpgsql_xact_cb(XactEvent event, void *arg)
        if (event == XACT_EVENT_COMMIT || event == XACT_EVENT_PREPARE)
        {
                /* Shouldn't be any econtext stack entries left at commit */
-               Assert(simple_econtext_stack == NULL);
+               //Assert(simple_econtext_stack == NULL);
+               simple_econtext_stack = NULL;
 
                if (shared_simple_eval_estate)
                        FreeExecutorState(shared_simple_eval_estate);
diff --git a/src/pl/plpgsql/src/pl_funcs.c b/src/pl/plpgsql/src/pl_funcs.c
index 23f54e1c21..a3a45916d1 100644
--- a/src/pl/plpgsql/src/pl_funcs.c
+++ b/src/pl/plpgsql/src/pl_funcs.c
@@ -284,6 +284,10 @@ plpgsql_stmt_typename(PLpgSQL_stmt *stmt)
                        return "CLOSE";
                case PLPGSQL_STMT_PERFORM:
                        return "PERFORM";
+               case PLPGSQL_STMT_COMMIT:
+                       return "COMMIT";
+               case PLPGSQL_STMT_ROLLBACK:
+                       return "ROLLBACK";
        }
 
        return "unknown";
@@ -363,6 +367,8 @@ static void free_open(PLpgSQL_stmt_open *stmt);
 static void free_fetch(PLpgSQL_stmt_fetch *stmt);
 static void free_close(PLpgSQL_stmt_close *stmt);
 static void free_perform(PLpgSQL_stmt_perform *stmt);
+static void free_commit(PLpgSQL_stmt_commit *stmt);
+static void free_rollback(PLpgSQL_stmt_rollback *stmt);
 static void free_expr(PLpgSQL_expr *expr);
 
 
@@ -443,6 +449,12 @@ free_stmt(PLpgSQL_stmt *stmt)
                case PLPGSQL_STMT_PERFORM:
                        free_perform((PLpgSQL_stmt_perform *) stmt);
                        break;
+               case PLPGSQL_STMT_COMMIT:
+                       free_commit((PLpgSQL_stmt_commit *) stmt);
+                       break;
+               case PLPGSQL_STMT_ROLLBACK:
+                       free_rollback((PLpgSQL_stmt_rollback *) stmt);
+                       break;
                default:
                        elog(ERROR, "unrecognized cmd_type: %d", 
stmt->cmd_type);
                        break;
@@ -590,6 +602,16 @@ free_perform(PLpgSQL_stmt_perform *stmt)
        free_expr(stmt->expr);
 }
 
+static void
+free_commit(PLpgSQL_stmt_commit *stmt)
+{
+}
+
+static void
+free_rollback(PLpgSQL_stmt_rollback *stmt)
+{
+}
+
 static void
 free_exit(PLpgSQL_stmt_exit *stmt)
 {
@@ -777,6 +799,8 @@ static void dump_fetch(PLpgSQL_stmt_fetch *stmt);
 static void dump_cursor_direction(PLpgSQL_stmt_fetch *stmt);
 static void dump_close(PLpgSQL_stmt_close *stmt);
 static void dump_perform(PLpgSQL_stmt_perform *stmt);
+static void dump_commit(PLpgSQL_stmt_commit *stmt);
+static void dump_rollback(PLpgSQL_stmt_rollback *stmt);
 static void dump_expr(PLpgSQL_expr *expr);
 
 
@@ -867,6 +891,12 @@ dump_stmt(PLpgSQL_stmt *stmt)
                case PLPGSQL_STMT_PERFORM:
                        dump_perform((PLpgSQL_stmt_perform *) stmt);
                        break;
+               case PLPGSQL_STMT_COMMIT:
+                       dump_commit((PLpgSQL_stmt_commit *) stmt);
+                       break;
+               case PLPGSQL_STMT_ROLLBACK:
+                       dump_rollback((PLpgSQL_stmt_rollback *) stmt);
+                       break;
                default:
                        elog(ERROR, "unrecognized cmd_type: %d", 
stmt->cmd_type);
                        break;
@@ -1243,6 +1273,20 @@ dump_perform(PLpgSQL_stmt_perform *stmt)
        printf("\n");
 }
 
+static void
+dump_commit(PLpgSQL_stmt_commit *stmt)
+{
+       dump_ind();
+       printf("COMMIT\n");
+}
+
+static void
+dump_rollback(PLpgSQL_stmt_rollback *stmt)
+{
+       dump_ind();
+       printf("ROLLBACK\n");
+}
+
 static void
 dump_exit(PLpgSQL_stmt_exit *stmt)
 {
diff --git a/src/pl/plpgsql/src/pl_gram.y b/src/pl/plpgsql/src/pl_gram.y
index 94f1f58593..e661750176 100644
--- a/src/pl/plpgsql/src/pl_gram.y
+++ b/src/pl/plpgsql/src/pl_gram.y
@@ -199,6 +199,7 @@ static      void                    
check_raise_parameters(PLpgSQL_stmt_raise *stmt);
 %type <stmt>   stmt_return stmt_raise stmt_assert stmt_execsql
 %type <stmt>   stmt_dynexecute stmt_for stmt_perform stmt_getdiag
 %type <stmt>   stmt_open stmt_fetch stmt_move stmt_close stmt_null
+%type <stmt>   stmt_commit stmt_rollback
 %type <stmt>   stmt_case stmt_foreach_a
 
 %type <list>   proc_exceptions
@@ -261,6 +262,7 @@ static      void                    
check_raise_parameters(PLpgSQL_stmt_raise *stmt);
 %token <keyword>       K_COLLATE
 %token <keyword>       K_COLUMN
 %token <keyword>       K_COLUMN_NAME
+%token <keyword>       K_COMMIT
 %token <keyword>       K_CONSTANT
 %token <keyword>       K_CONSTRAINT
 %token <keyword>       K_CONSTRAINT_NAME
@@ -326,6 +328,7 @@ static      void                    
check_raise_parameters(PLpgSQL_stmt_raise *stmt);
 %token <keyword>       K_RETURN
 %token <keyword>       K_RETURNED_SQLSTATE
 %token <keyword>       K_REVERSE
+%token <keyword>       K_ROLLBACK
 %token <keyword>       K_ROW_COUNT
 %token <keyword>       K_ROWTYPE
 %token <keyword>       K_SCHEMA
@@ -898,6 +901,10 @@ proc_stmt          : pl_block ';'
                                                { $$ = $1; }
                                | stmt_null
                                                { $$ = $1; }
+                               | stmt_commit
+                                               { $$ = $1; }
+                               | stmt_rollback
+                                               { $$ = $1; }
                                ;
 
 stmt_perform   : K_PERFORM expr_until_semi
@@ -2174,6 +2181,31 @@ stmt_null                : K_NULL ';'
                                        }
                                ;
 
+stmt_commit            : K_COMMIT ';'
+                                       {
+                                               PLpgSQL_stmt_commit *new;
+
+                                               new = 
palloc(sizeof(PLpgSQL_stmt_commit));
+                                               new->cmd_type = 
PLPGSQL_STMT_COMMIT;
+                                               new->lineno = 
plpgsql_location_to_lineno(@1);
+
+                                               $$ = (PLpgSQL_stmt *)new;
+                                       }
+                               ;
+
+stmt_rollback  : K_ROLLBACK ';'
+                                       {
+                                               PLpgSQL_stmt_rollback *new;
+
+                                               new = 
palloc(sizeof(PLpgSQL_stmt_rollback));
+                                               new->cmd_type = 
PLPGSQL_STMT_ROLLBACK;
+                                               new->lineno = 
plpgsql_location_to_lineno(@1);
+
+                                               $$ = (PLpgSQL_stmt *)new;
+                                       }
+                               ;
+
+
 cursor_variable        : T_DATUM
                                        {
                                                /*
@@ -2410,6 +2442,7 @@ unreserved_keyword        :
                                | K_COLLATE
                                | K_COLUMN
                                | K_COLUMN_NAME
+                               | K_COMMIT
                                | K_CONSTANT
                                | K_CONSTRAINT
                                | K_CONSTRAINT_NAME
@@ -2461,6 +2494,7 @@ unreserved_keyword        :
                                | K_RETURN
                                | K_RETURNED_SQLSTATE
                                | K_REVERSE
+                               | K_ROLLBACK
                                | K_ROW_COUNT
                                | K_ROWTYPE
                                | K_SCHEMA
diff --git a/src/pl/plpgsql/src/pl_handler.c b/src/pl/plpgsql/src/pl_handler.c
index 1ebb7a7b5e..93a7ff2df3 100644
--- a/src/pl/plpgsql/src/pl_handler.c
+++ b/src/pl/plpgsql/src/pl_handler.c
@@ -18,6 +18,7 @@
 #include "access/htup_details.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/defrem.h"
 #include "funcapi.h"
 #include "miscadmin.h"
 #include "utils/builtins.h"
@@ -255,7 +256,12 @@ plpgsql_call_handler(PG_FUNCTION_ARGS)
                        retval = (Datum) 0;
                }
                else
+               {
+                       if (func->fn_rettype == InvalidOid &&
+                               !castNode(CallContext, fcinfo->context)->atomic)
+                               SPI_set_nonatomic();
                        retval = plpgsql_exec_function(func, fcinfo, NULL);
+               }
        }
        PG_CATCH();
        {
@@ -304,6 +310,8 @@ plpgsql_inline_handler(PG_FUNCTION_ARGS)
        if ((rc = SPI_connect()) != SPI_OK_CONNECT)
                elog(ERROR, "SPI_connect failed: %s", 
SPI_result_code_string(rc));
 
+       SPI_set_nonatomic();
+
        /* Compile the anonymous code block */
        func = plpgsql_compile_inline(codeblock->source_text);
 
diff --git a/src/pl/plpgsql/src/pl_scanner.c b/src/pl/plpgsql/src/pl_scanner.c
index 553be8c93c..a172031db2 100644
--- a/src/pl/plpgsql/src/pl_scanner.c
+++ b/src/pl/plpgsql/src/pl_scanner.c
@@ -106,6 +106,7 @@ static const ScanKeyword unreserved_keywords[] = {
        PG_KEYWORD("collate", K_COLLATE, UNRESERVED_KEYWORD)
        PG_KEYWORD("column", K_COLUMN, UNRESERVED_KEYWORD)
        PG_KEYWORD("column_name", K_COLUMN_NAME, UNRESERVED_KEYWORD)
+       PG_KEYWORD("commit", K_COMMIT, UNRESERVED_KEYWORD)
        PG_KEYWORD("constant", K_CONSTANT, UNRESERVED_KEYWORD)
        PG_KEYWORD("constraint", K_CONSTRAINT, UNRESERVED_KEYWORD)
        PG_KEYWORD("constraint_name", K_CONSTRAINT_NAME, UNRESERVED_KEYWORD)
@@ -158,6 +159,7 @@ static const ScanKeyword unreserved_keywords[] = {
        PG_KEYWORD("return", K_RETURN, UNRESERVED_KEYWORD)
        PG_KEYWORD("returned_sqlstate", K_RETURNED_SQLSTATE, UNRESERVED_KEYWORD)
        PG_KEYWORD("reverse", K_REVERSE, UNRESERVED_KEYWORD)
+       PG_KEYWORD("rollback", K_ROLLBACK, UNRESERVED_KEYWORD)
        PG_KEYWORD("row_count", K_ROW_COUNT, UNRESERVED_KEYWORD)
        PG_KEYWORD("rowtype", K_ROWTYPE, UNRESERVED_KEYWORD)
        PG_KEYWORD("schema", K_SCHEMA, UNRESERVED_KEYWORD)
diff --git a/src/pl/plpgsql/src/plpgsql.h b/src/pl/plpgsql/src/plpgsql.h
index 2b19948562..8494a4374d 100644
--- a/src/pl/plpgsql/src/plpgsql.h
+++ b/src/pl/plpgsql/src/plpgsql.h
@@ -105,7 +105,9 @@ typedef enum PLpgSQL_stmt_type
        PLPGSQL_STMT_OPEN,
        PLPGSQL_STMT_FETCH,
        PLPGSQL_STMT_CLOSE,
-       PLPGSQL_STMT_PERFORM
+       PLPGSQL_STMT_PERFORM,
+       PLPGSQL_STMT_COMMIT,
+       PLPGSQL_STMT_ROLLBACK
 } PLpgSQL_stmt_type;
 
 /*
@@ -433,6 +435,24 @@ typedef struct PLpgSQL_stmt_perform
        PLpgSQL_expr *expr;
 } PLpgSQL_stmt_perform;
 
+/*
+ * COMMIT statement
+ */
+typedef struct PLpgSQL_stmt_commit
+{
+       PLpgSQL_stmt_type cmd_type;
+       int                     lineno;
+} PLpgSQL_stmt_commit;
+
+/*
+ * ROLLBACK statement
+ */
+typedef struct PLpgSQL_stmt_rollback
+{
+       PLpgSQL_stmt_type cmd_type;
+       int                     lineno;
+} PLpgSQL_stmt_rollback;
+
 /*
  * GET DIAGNOSTICS item
  */
diff --git a/src/pl/plpython/Makefile b/src/pl/plpython/Makefile
index cc91afebde..d09910835d 100644
--- a/src/pl/plpython/Makefile
+++ b/src/pl/plpython/Makefile
@@ -90,6 +90,7 @@ REGRESS = \
        plpython_quote \
        plpython_composite \
        plpython_subtransaction \
+       plpython_transaction \
        plpython_drop
 
 REGRESS_PLPYTHON3_MANGLE := $(REGRESS)
diff --git a/src/pl/plpython/expected/plpython_test.out 
b/src/pl/plpython/expected/plpython_test.out
index 847e4cc412..f0a10cc05f 100644
--- a/src/pl/plpython/expected/plpython_test.out
+++ b/src/pl/plpython/expected/plpython_test.out
@@ -43,11 +43,12 @@ contents.sort()
 return contents
 $$ LANGUAGE plpythonu;
 select module_contents();
- module_contents 
------------------
+  module_contents  
+-------------------
  Error
  Fatal
  SPIError
+ commit
  cursor
  debug
  error
@@ -60,10 +61,12 @@ select module_contents();
  quote_ident
  quote_literal
  quote_nullable
+ rollback
  spiexceptions
+ start_transaction
  subtransaction
  warning
-(18 rows)
+(21 rows)
 
 CREATE FUNCTION elog_test_basic() RETURNS void
 AS $$
diff --git a/src/pl/plpython/expected/plpython_transaction.out 
b/src/pl/plpython/expected/plpython_transaction.out
new file mode 100644
index 0000000000..c664b5d1d2
--- /dev/null
+++ b/src/pl/plpython/expected/plpython_transaction.out
@@ -0,0 +1,83 @@
+CREATE TABLE test1 (a int, b text);
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+        plpy.commit()
+    else:
+        plpy.rollback()
+$$;
+CALL transaction_test1();
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+TRUNCATE test1;
+DO
+LANGUAGE plpythonu
+$$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+        plpy.commit()
+    else:
+        plpy.rollback()
+$$;
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+TRUNCATE test1;
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+        plpy.commit()
+    else:
+        plpy.rollback()
+return 1
+$$;
+SELECT transaction_test2();
+ERROR:  invalid transaction termination
+CONTEXT:  PL/Python function "transaction_test2"
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE plpythonu
+AS $$
+plpy.execute("CALL transaction_test1()")
+return 1
+$$;
+SELECT transaction_test3();
+ERROR:  spiexceptions.InvalidTransactionTermination: invalid transaction 
termination
+CONTEXT:  Traceback (most recent call last):
+  PL/Python function "transaction_test3", line 2, in <module>
+    plpy.execute("CALL transaction_test1()")
+PL/Python function "transaction_test3"
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+DROP TABLE test1;
diff --git a/src/pl/plpython/plpy_main.c b/src/pl/plpython/plpy_main.c
index 7df50c09c8..ae83fcf5d1 100644
--- a/src/pl/plpython/plpy_main.c
+++ b/src/pl/plpython/plpy_main.c
@@ -9,6 +9,7 @@
 #include "access/htup_details.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/defrem.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
 #include "miscadmin.h"
@@ -262,6 +263,9 @@ plpython_call_handler(PG_FUNCTION_ARGS)
                else
                {
                        proc = PLy_procedure_get(funcoid, InvalidOid, false);
+                       if (proc->is_procedure &&
+                               !castNode(CallContext, fcinfo->context)->atomic)
+                                       SPI_set_nonatomic();
                        exec_ctx->curr_proc = proc;
                        retval = PLy_exec_function(fcinfo, proc);
                }
@@ -305,6 +309,7 @@ plpython_inline_handler(PG_FUNCTION_ARGS)
        /* Note: SPI_finish() happens in plpy_exec.c, which is dubious design */
        if (SPI_connect() != SPI_OK_CONNECT)
                elog(ERROR, "SPI_connect failed");
+       SPI_set_nonatomic();
 
        MemSet(&fake_fcinfo, 0, sizeof(fake_fcinfo));
        MemSet(&flinfo, 0, sizeof(flinfo));
@@ -424,7 +429,7 @@ PLy_push_execution_context(void)
        PLyExecutionContext *context;
 
        context = (PLyExecutionContext *)
-               MemoryContextAlloc(TopTransactionContext, 
sizeof(PLyExecutionContext));
+               MemoryContextAlloc(PortalContext, sizeof(PLyExecutionContext));
        context->curr_proc = NULL;
        context->scratch_ctx = NULL;
        context->next = PLy_execution_contexts;
diff --git a/src/pl/plpython/plpy_plpymodule.c 
b/src/pl/plpython/plpy_plpymodule.c
index 759ad44932..a95e66bfee 100644
--- a/src/pl/plpython/plpy_plpymodule.c
+++ b/src/pl/plpython/plpy_plpymodule.c
@@ -6,8 +6,10 @@
 
 #include "postgres.h"
 
+#include "access/xact.h"
 #include "mb/pg_wchar.h"
 #include "utils/builtins.h"
+#include "utils/snapmgr.h"
 
 #include "plpython.h"
 
@@ -41,6 +43,9 @@ static PyObject *PLy_fatal(PyObject *self, PyObject *args, 
PyObject *kw);
 static PyObject *PLy_quote_literal(PyObject *self, PyObject *args);
 static PyObject *PLy_quote_nullable(PyObject *self, PyObject *args);
 static PyObject *PLy_quote_ident(PyObject *self, PyObject *args);
+static PyObject *PLy_start_transaction(PyObject *self, PyObject *args);
+static PyObject *PLy_commit(PyObject *self, PyObject *args);
+static PyObject *PLy_rollback(PyObject *self, PyObject *args);
 
 
 /* A list of all known exceptions, generated from backend/utils/errcodes.txt */
@@ -95,6 +100,13 @@ static PyMethodDef PLy_methods[] = {
         */
        {"cursor", PLy_cursor, METH_VARARGS, NULL},
 
+       /*
+        * transaction control
+        */
+       {"start_transaction", PLy_start_transaction, METH_NOARGS, NULL},
+       {"commit", PLy_commit, METH_NOARGS, NULL},
+       {"rollback", PLy_rollback, METH_NOARGS, NULL},
+
        {NULL, NULL, 0, NULL}
 };
 
@@ -577,3 +589,29 @@ PLy_output(volatile int level, PyObject *self, PyObject 
*args, PyObject *kw)
         */
        Py_RETURN_NONE;
 }
+
+static PyObject *
+PLy_start_transaction(PyObject *self, PyObject *args)
+{
+       SPI_start_transaction();
+
+       Py_RETURN_NONE;
+}
+
+static PyObject *
+PLy_commit(PyObject *self, PyObject *args)
+{
+       SPI_commit();
+       SPI_start_transaction();
+
+       Py_RETURN_NONE;
+}
+
+static PyObject *
+PLy_rollback(PyObject *self, PyObject *args)
+{
+       SPI_rollback();
+       SPI_start_transaction();
+
+       Py_RETURN_NONE;
+}
diff --git a/src/pl/plpython/sql/plpython_transaction.sql 
b/src/pl/plpython/sql/plpython_transaction.sql
new file mode 100644
index 0000000000..ac98a81e1b
--- /dev/null
+++ b/src/pl/plpython/sql/plpython_transaction.sql
@@ -0,0 +1,69 @@
+CREATE TABLE test1 (a int, b text);
+
+
+CREATE PROCEDURE transaction_test1()
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+        plpy.commit()
+    else:
+        plpy.rollback()
+$$;
+
+CALL transaction_test1();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+DO
+LANGUAGE plpythonu
+$$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+        plpy.commit()
+    else:
+        plpy.rollback()
+$$;
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE plpythonu
+AS $$
+for i in range(0, 10):
+    plpy.execute("INSERT INTO test1 (a) VALUES (%d)" % i)
+    if i % 2 == 0:
+        plpy.commit()
+    else:
+        plpy.rollback()
+return 1
+$$;
+
+SELECT transaction_test2();
+
+SELECT * FROM test1;
+
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE plpythonu
+AS $$
+plpy.execute("CALL transaction_test1()")
+return 1
+$$;
+
+SELECT transaction_test3();
+
+SELECT * FROM test1;
+
+
+DROP TABLE test1;
diff --git a/src/pl/tcl/Makefile b/src/pl/tcl/Makefile
index 6a92a9b6aa..ef61ee596e 100644
--- a/src/pl/tcl/Makefile
+++ b/src/pl/tcl/Makefile
@@ -28,7 +28,7 @@ DATA = pltcl.control pltcl--1.0.sql 
pltcl--unpackaged--1.0.sql \
        pltclu.control pltclu--1.0.sql pltclu--unpackaged--1.0.sql
 
 REGRESS_OPTS = --dbname=$(PL_TESTDB) --load-extension=pltcl
-REGRESS = pltcl_setup pltcl_queries pltcl_call pltcl_start_proc pltcl_subxact 
pltcl_unicode
+REGRESS = pltcl_setup pltcl_queries pltcl_call pltcl_start_proc pltcl_subxact 
pltcl_unicode pltcl_transaction
 
 # Tcl on win32 ships with import libraries only for Microsoft Visual C++,
 # which are not compatible with mingw gcc. Therefore we need to build a
diff --git a/src/pl/tcl/expected/pltcl_transaction.out 
b/src/pl/tcl/expected/pltcl_transaction.out
new file mode 100644
index 0000000000..6ce900027c
--- /dev/null
+++ b/src/pl/tcl/expected/pltcl_transaction.out
@@ -0,0 +1,63 @@
+-- suppress CONTEXT so that function OIDs aren't in output
+\set VERBOSITY terse
+CREATE TABLE test1 (a int, b text);
+CREATE PROCEDURE transaction_test1()
+LANGUAGE pltcl
+AS $$
+for {set i 0} {$i < 10} {incr i} {
+    spi_exec "INSERT INTO test1 (a) VALUES ($i)"
+    if {$i % 2 == 0} {
+        commit
+    } else {
+        rollback
+    }
+}
+$$;
+CALL transaction_test1();
+SELECT * FROM test1;
+ a | b 
+---+---
+ 0 | 
+ 2 | 
+ 4 | 
+ 6 | 
+ 8 | 
+(5 rows)
+
+TRUNCATE test1;
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE pltcl
+AS $$
+for {set i 0} {$i < 10} {incr i} {
+    spi_exec "INSERT INTO test1 (a) VALUES ($i)"
+    if {$i % 2 == 0} {
+        commit
+    } else {
+        rollback
+    }
+}
+return 1
+$$;
+SELECT transaction_test2();
+ERROR:  invalid transaction termination
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE pltcl
+AS $$
+spi_exec "CALL transaction_test1()"
+return 1
+$$;
+SELECT transaction_test3();
+ERROR:  invalid transaction termination
+SELECT * FROM test1;
+ a | b 
+---+---
+(0 rows)
+
+DROP TABLE test1;
diff --git a/src/pl/tcl/pltcl.c b/src/pl/tcl/pltcl.c
index e0792d93e1..816882e92c 100644
--- a/src/pl/tcl/pltcl.c
+++ b/src/pl/tcl/pltcl.c
@@ -18,6 +18,7 @@
 #include "catalog/objectaccess.h"
 #include "catalog/pg_proc.h"
 #include "catalog/pg_type.h"
+#include "commands/defrem.h"
 #include "commands/event_trigger.h"
 #include "commands/trigger.h"
 #include "executor/spi.h"
@@ -312,6 +313,10 @@ static int pltcl_SPI_lastoid(ClientData cdata, Tcl_Interp 
*interp,
                                  int objc, Tcl_Obj *const objv[]);
 static int pltcl_subtransaction(ClientData cdata, Tcl_Interp *interp,
                                         int objc, Tcl_Obj *const objv[]);
+static int pltcl_commit(ClientData cdata, Tcl_Interp *interp,
+                                        int objc, Tcl_Obj *const objv[]);
+static int pltcl_rollback(ClientData cdata, Tcl_Interp *interp,
+                                        int objc, Tcl_Obj *const objv[]);
 
 static void pltcl_subtrans_begin(MemoryContext oldcontext,
                                         ResourceOwner oldowner);
@@ -524,6 +529,10 @@ pltcl_init_interp(pltcl_interp_desc *interp_desc, Oid 
prolang, bool pltrusted)
                                                 pltcl_SPI_lastoid, NULL, NULL);
        Tcl_CreateObjCommand(interp, "subtransaction",
                                                 pltcl_subtransaction, NULL, 
NULL);
+       Tcl_CreateObjCommand(interp, "commit",
+                                                pltcl_commit, NULL, NULL);
+       Tcl_CreateObjCommand(interp, "rollback",
+                                                pltcl_rollback, NULL, NULL);
 
        /************************************************************
         * Call the appropriate start_proc, if there is one.
@@ -812,6 +821,10 @@ pltcl_func_handler(PG_FUNCTION_ARGS, pltcl_call_state 
*call_state,
        prodesc = compile_pltcl_function(fcinfo->flinfo->fn_oid, InvalidOid,
                                                                         false, 
pltrusted);
 
+       if (prodesc->result_typid == InvalidOid &&
+               !castNode(CallContext, fcinfo->context)->atomic)
+               SPI_set_nonatomic();
+
        call_state->prodesc = prodesc;
        prodesc->fn_refcount++;
 
@@ -2935,6 +2948,38 @@ pltcl_subtransaction(ClientData cdata, Tcl_Interp 
*interp,
 }
 
 
+/**********************************************************************
+ * pltcl_commit()
+ *
+ * Commit the transaction and start a new one.
+ **********************************************************************/
+static int
+pltcl_commit(ClientData cdata, Tcl_Interp *interp,
+                        int objc, Tcl_Obj *const objv[])
+{
+       SPI_commit();
+       SPI_start_transaction();
+
+       return TCL_OK;
+}
+
+
+/**********************************************************************
+ * pltcl_rollback()
+ *
+ * Abort the transaction and start a new one.
+ **********************************************************************/
+static int
+pltcl_rollback(ClientData cdata, Tcl_Interp *interp,
+                          int objc, Tcl_Obj *const objv[])
+{
+       SPI_rollback();
+       SPI_start_transaction();
+
+       return TCL_OK;
+}
+
+
 /**********************************************************************
  * pltcl_set_tuple_values() - Set variables for all attributes
  *                               of a given tuple
diff --git a/src/pl/tcl/sql/pltcl_transaction.sql 
b/src/pl/tcl/sql/pltcl_transaction.sql
new file mode 100644
index 0000000000..14aed5844a
--- /dev/null
+++ b/src/pl/tcl/sql/pltcl_transaction.sql
@@ -0,0 +1,60 @@
+-- suppress CONTEXT so that function OIDs aren't in output
+\set VERBOSITY terse
+
+CREATE TABLE test1 (a int, b text);
+
+
+CREATE PROCEDURE transaction_test1()
+LANGUAGE pltcl
+AS $$
+for {set i 0} {$i < 10} {incr i} {
+    spi_exec "INSERT INTO test1 (a) VALUES ($i)"
+    if {$i % 2 == 0} {
+        commit
+    } else {
+        rollback
+    }
+}
+$$;
+
+CALL transaction_test1();
+
+SELECT * FROM test1;
+
+
+TRUNCATE test1;
+
+-- not allowed in a function
+CREATE FUNCTION transaction_test2() RETURNS int
+LANGUAGE pltcl
+AS $$
+for {set i 0} {$i < 10} {incr i} {
+    spi_exec "INSERT INTO test1 (a) VALUES ($i)"
+    if {$i % 2 == 0} {
+        commit
+    } else {
+        rollback
+    }
+}
+return 1
+$$;
+
+SELECT transaction_test2();
+
+SELECT * FROM test1;
+
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION transaction_test3() RETURNS int
+LANGUAGE pltcl
+AS $$
+spi_exec "CALL transaction_test1()"
+return 1
+$$;
+
+SELECT transaction_test3();
+
+SELECT * FROM test1;
+
+
+DROP TABLE test1;
diff --git a/src/test/regress/expected/plpgsql.out 
b/src/test/regress/expected/plpgsql.out
index d6e5bc3353..040f667f8e 100644
--- a/src/test/regress/expected/plpgsql.out
+++ b/src/test/regress/expected/plpgsql.out
@@ -6077,7 +6077,103 @@ SELECT * FROM proc_test1;
  55
 (1 row)
 
+TRUNCATE proc_test1;
+CREATE PROCEDURE test_proc_transaction1()
+LANGUAGE plpgsql
+AS $$
+BEGIN
+    FOR i IN 0..9 LOOP
+        INSERT INTO proc_test1 (a) VALUES (i);
+        IF i % 2 = 0 THEN
+            COMMIT;
+        ELSE
+            ROLLBACK;
+        END IF;
+    END LOOP;
+END
+$$;
+CALL test_proc_transaction1();
+SELECT * FROM proc_test1;
+ a 
+---
+ 0
+ 2
+ 4
+ 6
+ 8
+(5 rows)
+
+TRUNCATE proc_test1;
+DO
+LANGUAGE plpgsql
+$$
+BEGIN
+    FOR i IN 0..9 LOOP
+        INSERT INTO proc_test1 (a) VALUES (i);
+        IF i % 2 = 0 THEN
+            COMMIT;
+        ELSE
+            ROLLBACK;
+        END IF;
+    END LOOP;
+END
+$$;
+SELECT * FROM proc_test1;
+ a 
+---
+ 0
+ 2
+ 4
+ 6
+ 8
+(5 rows)
+
+TRUNCATE proc_test1;
+-- not allowed in a function
+CREATE FUNCTION test_proc_transaction2() RETURNS int
+LANGUAGE plpgsql
+AS $$
+BEGIN
+    FOR i IN 0..9 LOOP
+        INSERT INTO proc_test1 (a) VALUES (i);
+        IF i % 2 = 0 THEN
+            COMMIT;
+        ELSE
+            ROLLBACK;
+        END IF;
+    END LOOP;
+    RETURN 1;
+END
+$$;
+SELECT test_proc_transaction2();
+ERROR:  invalid transaction termination
+CONTEXT:  PL/pgSQL function test_proc_transaction2() line 6 at COMMIT
+SELECT * FROM proc_test1;
+ a 
+---
+(0 rows)
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION test_proc_transaction3() RETURNS int
+LANGUAGE plpgsql
+AS $$
+BEGIN
+    CALL test_proc_transaction1();
+    RETURN 1;
+END;
+$$;
+SELECT test_proc_transaction3();
+ERROR:  invalid transaction termination
+CONTEXT:  PL/pgSQL function test_proc_transaction1() line 6 at COMMIT
+SQL statement "CALL test_proc_transaction1()"
+PL/pgSQL function test_proc_transaction3() line 3 at SQL statement
+SELECT * FROM proc_test1;
+ a 
+---
+(0 rows)
+
 DROP PROCEDURE test_proc1;
 DROP PROCEDURE test_proc2;
 DROP PROCEDURE test_proc3;
+DROP PROCEDURE test_proc_transaction1;
 DROP TABLE proc_test1;
diff --git a/src/test/regress/sql/plpgsql.sql b/src/test/regress/sql/plpgsql.sql
index 1c355132b7..a0e4821e0c 100644
--- a/src/test/regress/sql/plpgsql.sql
+++ b/src/test/regress/sql/plpgsql.sql
@@ -4864,8 +4864,90 @@ CREATE PROCEDURE test_proc3(x int)
 SELECT * FROM proc_test1;
 
 
+TRUNCATE proc_test1;
+
+CREATE PROCEDURE test_proc_transaction1()
+LANGUAGE plpgsql
+AS $$
+BEGIN
+    FOR i IN 0..9 LOOP
+        INSERT INTO proc_test1 (a) VALUES (i);
+        IF i % 2 = 0 THEN
+            COMMIT;
+        ELSE
+            ROLLBACK;
+        END IF;
+    END LOOP;
+END
+$$;
+
+CALL test_proc_transaction1();
+
+SELECT * FROM proc_test1;
+
+
+TRUNCATE proc_test1;
+
+DO
+LANGUAGE plpgsql
+$$
+BEGIN
+    FOR i IN 0..9 LOOP
+        INSERT INTO proc_test1 (a) VALUES (i);
+        IF i % 2 = 0 THEN
+            COMMIT;
+        ELSE
+            ROLLBACK;
+        END IF;
+    END LOOP;
+END
+$$;
+
+SELECT * FROM proc_test1;
+
+
+TRUNCATE proc_test1;
+
+-- not allowed in a function
+CREATE FUNCTION test_proc_transaction2() RETURNS int
+LANGUAGE plpgsql
+AS $$
+BEGIN
+    FOR i IN 0..9 LOOP
+        INSERT INTO proc_test1 (a) VALUES (i);
+        IF i % 2 = 0 THEN
+            COMMIT;
+        ELSE
+            ROLLBACK;
+        END IF;
+    END LOOP;
+    RETURN 1;
+END
+$$;
+
+SELECT test_proc_transaction2();
+
+SELECT * FROM proc_test1;
+
+
+-- also not allowed if procedure is called from a function
+CREATE FUNCTION test_proc_transaction3() RETURNS int
+LANGUAGE plpgsql
+AS $$
+BEGIN
+    CALL test_proc_transaction1();
+    RETURN 1;
+END;
+$$;
+
+SELECT test_proc_transaction3();
+
+SELECT * FROM proc_test1;
+
+
 DROP PROCEDURE test_proc1;
 DROP PROCEDURE test_proc2;
 DROP PROCEDURE test_proc3;
+DROP PROCEDURE test_proc_transaction1;
 
 DROP TABLE proc_test1;
-- 
2.15.0

Reply via email to