On 1/5/17 9:50 PM, Jim Nasby wrote:
The * on that is there's something odd going on where plpython starts
out really fast at this, then gets 100% slower. I've reached out to some
python folks about that. Even so, the overall results from a quick test
on my laptop are (IMHO) impressive:

            Old Code        New Code    Improvement
Pure SQL    2 sec          2 sec
plpython    12.7-14 sec    4-10 sec     ~1.3-3x
plpython - SQL 10.7-12 sec 2-8 sec      ~1.3-6x

Pure SQL is how long an equivalent query takes to run with just SQL.
plpython - SQL is simply the raw python times minus the pure SQL time.

I finally got all the kinks worked out and did some testing with python 3. Performance for my test [1] improved ~460% when returning a dict of lists (as opposed to the current list of dicts). Based on previous testing, I expect that using this method to return a list of dicts will be about 8% slower. The inconsistency in results on 2.7 has to do with how python 2 handles ints.

Someone who's familiar with pl/perl should take a look at this and see if it would apply there. I've attached the SPI portion of this patch.

I think the last step here is to figure out how to support switching between the current behavior and the "columnar" behavior of a dict of lists. I believe the best way to do that is to add two optional arguments to the execution functions: container=[] and members={}, and then copy those to produce the output objects. That means you can get the new behavior by doing something like:

plpy.execute('...', container={}, members=[])

Or, more interesting, you could do:

plpy.execute('...', container=Pandas.DataFrame, members=Pandas.Series)

since that's what a lot of people are going to want anyway.

In the future we could also add a GUC to change the default behavior.

Any concerns with that approach?

1:
d = plpy.execute('SELECT s AS some_table_id, s AS some_field_name, s AS 
some_other_field_name FROM generate_series(1,{}) s'.format(iter) )
return len(d['some_table_id'])
--
Jim Nasby, Data Architect, Blue Treble Consulting, Austin TX
Experts in Analytics, Data Architecture and PostgreSQL
Data in Trouble? Get it in Treble! http://BlueTreble.com
855-TREBLE2 (855-873-2532)
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index 7bd37283b7..97585d272e 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -55,7 +55,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, 
SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount);
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
                                        Datum *Values, const char *Nulls);
@@ -320,7 +321,34 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
        res = _SPI_execute_plan(&plan, NULL,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+int
+SPI_execute_callback(const char *src, bool read_only, long tcount,
+               DestReceiver *callback)
+{
+       _SPI_plan       plan;
+       int                     res;
+
+       if (src == NULL || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       memset(&plan, 0, sizeof(_SPI_plan));
+       plan.magic = _SPI_PLAN_MAGIC;
+       plan.cursor_options = 0;
+
+       _SPI_prepare_oneshot_plan(src, &plan);
+
+       res = _SPI_execute_plan(&plan, NULL,
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -354,7 +382,34 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
+
+       _SPI_end_call(true);
+       return res;
+}
+
+/* Execute a previously prepared plan with a callback Destination */
+int
+SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback)
+{
+       int                     res;
+
+       if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+               return SPI_ERROR_ARGUMENT;
+
+       if (plan->nargs > 0 && Values == NULL)
+               return SPI_ERROR_PARAM;
+
+       res = _SPI_begin_call(true);
+       if (res < 0)
+               return res;
+
+       res = _SPI_execute_plan(plan,
+                                                       
_SPI_convert_params(plan->nargs, plan->argtypes,
+                                                                               
                Values, Nulls),
+                                                       InvalidSnapshot, 
InvalidSnapshot,
+                                                       read_only, true, 
tcount, callback);
 
        _SPI_end_call(true);
        return res;
@@ -383,7 +438,7 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, 
ParamListInfo params,
 
        res = _SPI_execute_plan(plan, params,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -424,7 +479,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
                                                        
_SPI_convert_params(plan->nargs, plan->argtypes,
                                                                                
                Values, Nulls),
                                                        snapshot, 
crosscheck_snapshot,
-                                                       read_only, 
fire_triggers, tcount);
+                                                       read_only, 
fire_triggers, tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -471,7 +526,7 @@ SPI_execute_with_args(const char *src,
 
        res = _SPI_execute_plan(&plan, paramLI,
                                                        InvalidSnapshot, 
InvalidSnapshot,
-                                                       read_only, true, 
tcount);
+                                                       read_only, true, 
tcount, NULL);
 
        _SPI_end_call(true);
        return res;
@@ -1892,7 +1947,8 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr 
plan)
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                  Snapshot snapshot, Snapshot 
crosscheck_snapshot,
-                                 bool read_only, bool fire_triggers, uint64 
tcount)
+                                 bool read_only, bool fire_triggers, uint64 
tcount,
+                                 DestReceiver *callback)
 {
        int                     my_res = 0;
        uint64          my_processed = 0;
@@ -1903,6 +1959,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
        ErrorContextCallback spierrcontext;
        CachedPlan *cplan = NULL;
        ListCell   *lc1;
+       DestReceiver *dest = callback;
 
        /*
         * Setup error traceback support for ereport()
@@ -2020,7 +2077,6 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                {
                        PlannedStmt *stmt = (PlannedStmt *) lfirst(lc2);
                        bool            canSetTag = stmt->canSetTag;
-                       DestReceiver *dest;
 
                        _SPI_current->processed = 0;
                        _SPI_current->lastoid = InvalidOid;
@@ -2065,7 +2121,8 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                UpdateActiveSnapshotCommandId();
                        }
 
-                       dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
+                       if (!callback)
+                               dest = CreateDestReceiver(canSetTag ? DestSPI : 
DestNone);
 
                        if (stmt->utilityStmt == NULL)
                        {
@@ -2090,6 +2147,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                        {
                                char            
completionTag[COMPLETION_TAG_BUFSIZE];
 
+                               // XXX throw error if callback is set
                                ProcessUtility(stmt,
                                                           
plansource->query_string,
                                                           
PROCESS_UTILITY_QUERY,
diff --git a/src/backend/tcop/dest.c b/src/backend/tcop/dest.c
index da39f43f38..85141a0a0f 100644
--- a/src/backend/tcop/dest.c
+++ b/src/backend/tcop/dest.c
@@ -81,6 +81,11 @@ static DestReceiver spi_printtupDR = {
        DestSPI
 };
 
+static DestReceiver spi_callbackDR = {
+       donothingReceive, donothingStartup, donothingCleanup, donothingCleanup,
+       DestSPICallback
+};
+
 /* Globally available receiver for DestNone */
 DestReceiver *None_Receiver = &donothingDR;
 
@@ -117,6 +122,9 @@ CreateDestReceiver(CommandDest dest)
                case DestSPI:
                        return &spi_printtupDR;
 
+               case DestSPICallback:
+                       return &spi_callbackDR;
+
                case DestTuplestore:
                        return CreateTuplestoreDestReceiver();
 
@@ -162,6 +170,7 @@ EndCommand(const char *commandTag, CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -205,6 +214,7 @@ NullCommand(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
@@ -250,6 +260,7 @@ ReadyForQuery(CommandDest dest)
                case DestNone:
                case DestDebug:
                case DestSPI:
+               case DestSPICallback:
                case DestTuplestore:
                case DestIntoRel:
                case DestCopyOut:
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index a18ae63245..d779511130 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -74,11 +74,15 @@ extern PGDLLIMPORT int SPI_result;
 extern int     SPI_connect(void);
 extern int     SPI_finish(void);
 extern int     SPI_execute(const char *src, bool read_only, long tcount);
+extern int     SPI_execute_callback(const char *src, bool read_only, long 
tcount,
+                                                                       
DestReceiver *callback);
 extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                                 bool read_only, long tcount);
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
                                                                ParamListInfo 
params,
                                                                bool read_only, 
long tcount);
+extern int SPI_execute_plan_callback(SPIPlanPtr plan, Datum *Values, const 
char *Nulls,
+                                bool read_only, long tcount, DestReceiver 
*callback);
 extern int     SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
                  long tcount);
diff --git a/src/include/tcop/dest.h b/src/include/tcop/dest.h
index 93f9b7463a..9a8c98e267 100644
--- a/src/include/tcop/dest.h
+++ b/src/include/tcop/dest.h
@@ -90,6 +90,7 @@ typedef enum
        DestRemote,                                     /* results sent to 
frontend process */
        DestRemoteExecute,                      /* sent to frontend, in Execute 
command */
        DestSPI,                                        /* results sent to SPI 
manager */
+       DestSPICallback,                        /* results sent to 
user-specified callback function */
        DestTuplestore,                         /* results sent to Tuplestore */
        DestIntoRel,                            /* results sent to relation 
(SELECT INTO) */
        DestCopyOut,                            /* results sent to COPY TO code 
*/
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to