This relatively small change enables all sort of new and shiny evil by
allowing specification of a function to COPY that accepts each
produced row's content in turn.  The function must accept a single
INTERNAL-type argument, which is actually of the type StringInfo.

Patch highlights:

  - Grammar production changes to enable "TO FUNCTION <qualified name>"

  - A new enumeration value in CopyDest to indicate this new mode
    called COPY_FN.

  - CopyStateData's "filename" field has been renamed "destination"
    and is now a Node type.  Before it was either a string or NULL;
    now it may be a RangeVar, a string, or NULL.  Some code now has to
    go through some minor strVal() unboxing for the regular TO '/file'
    behavior.

  - Due to the relatively restricted way this function can be called
    it was possible to reduce per-row overhead by computing the
    FunctionCallInfo once and then reusing it, as opposed to simply
    using one of the convenience functions in the fmgr.

  - Add and expose the makeNameListFromRangeVar procedure to
    src/catalog/namespace.c, the inverse of makeRangeVarFromNameList.

Signed-off-by: Daniel Farina <dfar...@truviso.com>
---
 src/backend/catalog/namespace.c |   21 +++++
 src/backend/commands/copy.c     |  190 +++++++++++++++++++++++++++++++++-----
 src/backend/executor/spi.c      |    2 +-
 src/backend/nodes/copyfuncs.c   |    2 +-
 src/backend/nodes/equalfuncs.c  |    2 +-
 src/backend/parser/gram.y       |   30 ++++--
 src/include/catalog/namespace.h |    1 +
 src/include/nodes/parsenodes.h  |    3 +-
 8 files changed, 212 insertions(+), 39 deletions(-)

diff --git a/src/backend/catalog/namespace.c b/src/backend/catalog/namespace.c
index 99c9140..8911e29 100644
--- a/src/backend/catalog/namespace.c
+++ b/src/backend/catalog/namespace.c
@@ -2467,6 +2467,27 @@ QualifiedNameGetCreationNamespace(List *names, char 
**objname_p)
 }
 
 /*
+ * makeNameListFromRangeVar
+ *             Utility routine to convert a qualified-name list into RangeVar 
form.
+ */
+List *
+makeNameListFromRangeVar(RangeVar *rangevar)
+{
+       List *names = NIL;
+
+       Assert(rangevar->relname != NULL);
+       names = lcons(makeString(rangevar->relname), names);
+
+       if (rangevar->schemaname != NULL)
+               names = lcons(makeString(rangevar->schemaname), names);
+
+       if (rangevar->catalogname != NULL)
+               names = lcons(makeString(rangevar->catalogname), names);
+
+       return names;
+}
+
+/*
  * makeRangeVarFromNameList
  *             Utility routine to convert a qualified-name list into RangeVar 
form.
  */
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 5e95fd7..985505a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -33,6 +33,7 @@
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "optimizer/planner.h"
+#include "parser/parse_func.h"
 #include "parser/parse_relation.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
@@ -55,7 +56,8 @@ typedef enum CopyDest
 {
        COPY_FILE,                                      /* to/from file */
        COPY_OLD_FE,                            /* to/from frontend (2.0 
protocol) */
-       COPY_NEW_FE                                     /* to/from frontend 
(3.0 protocol) */
+       COPY_NEW_FE,                            /* to/from frontend (3.0 
protocol) */
+       COPY_FN                                         /* to function */
 } CopyDest;
 
 /*
@@ -104,7 +106,8 @@ typedef struct CopyStateData
        Relation        rel;                    /* relation to copy to or from 
*/
        QueryDesc  *queryDesc;          /* executable query to copy from */
        List       *attnumlist;         /* integer list of attnums to copy */
-       char       *filename;           /* filename, or NULL for STDIN/STDOUT */
+       Node       *destination;        /* filename, or NULL for STDIN/STDOUT, 
or a
+                                                                * function */
        bool            binary;                 /* binary format? */
        bool            oids;                   /* include OIDs? */
        bool            csv_mode;               /* Comma Separated Value 
format? */
@@ -131,6 +134,13 @@ typedef struct CopyStateData
        MemoryContext rowcontext;       /* per-row evaluation context */
 
        /*
+        * For writing rows out to a function. Used if copy_dest == COPY_FN
+        *
+        * Avoids repeated use of DirectFunctionCall for efficiency.
+        */
+       FunctionCallInfoData    output_fcinfo;
+
+       /*
         * These variables are used to reduce overhead in textual COPY FROM.
         *
         * attribute_buf holds the separated, de-escaped text for each field of
@@ -425,9 +435,11 @@ CopySendEndOfRow(CopyState cstate)
 {
        StringInfo      fe_msgbuf = cstate->fe_msgbuf;
 
+       /* Take care adding row delimiters*/
        switch (cstate->copy_dest)
        {
                case COPY_FILE:
+               case COPY_FN:
                        if (!cstate->binary)
                        {
                                /* Default line termination depends on platform 
*/
@@ -437,6 +449,18 @@ CopySendEndOfRow(CopyState cstate)
                                CopySendString(cstate, "\r\n");
 #endif
                        }
+                       break;
+               case COPY_NEW_FE:
+               case COPY_OLD_FE:
+                       /* The FE/BE protocol uses \n as newline for all 
platforms */
+                       if (!cstate->binary)
+                               CopySendChar(cstate, '\n');
+                       break;
+       }
+
+       switch (cstate->copy_dest)
+       {
+               case COPY_FILE:
 
                        (void) fwrite(fe_msgbuf->data, fe_msgbuf->len,
                                                  1, cstate->copy_file);
@@ -446,10 +470,6 @@ CopySendEndOfRow(CopyState cstate)
                                                 errmsg("could not write to 
COPY file: %m")));
                        break;
                case COPY_OLD_FE:
-                       /* The FE/BE protocol uses \n as newline for all 
platforms */
-                       if (!cstate->binary)
-                               CopySendChar(cstate, '\n');
-
                        if (pq_putbytes(fe_msgbuf->data, fe_msgbuf->len))
                        {
                                /* no hope of recovering connection sync, so 
FATAL */
@@ -459,13 +479,19 @@ CopySendEndOfRow(CopyState cstate)
                        }
                        break;
                case COPY_NEW_FE:
-                       /* The FE/BE protocol uses \n as newline for all 
platforms */
-                       if (!cstate->binary)
-                               CopySendChar(cstate, '\n');
-
                        /* Dump the accumulated row as one CopyData message */
                        (void) pq_putmessage('d', fe_msgbuf->data, 
fe_msgbuf->len);
                        break;
+               case COPY_FN:
+                       FunctionCallInvoke(&cstate->output_fcinfo);
+
+                       /*
+                        * These are set earlier and are not supposed to change 
row to row.
+                        */
+                       Assert(cstate->output_fcinfo.arg[0] ==
+                                  PointerGetDatum(cstate->fe_msgbuf));
+                       Assert(!cstate->output_fcinfo.argnull[0]);
+                       break;
        }
 
        resetStringInfo(fe_msgbuf);
@@ -577,6 +603,12 @@ CopyGetData(CopyState cstate, void *databuf, int minread, 
int maxread)
                                bytesread += avail;
                        }
                        break;
+               case COPY_FN:
+                       /*
+                        * Should be disallowed by some prior step
+                        */
+                       Assert(false);
+                       break;
        }
 
        return bytesread;
@@ -719,7 +751,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 {
        CopyState       cstate;
        bool            is_from = stmt->is_from;
-       bool            pipe = (stmt->filename == NULL);
+       bool            pipe = (stmt->destination == NULL);
        List       *attnamelist = stmt->attlist;
        List       *force_quote = NIL;
        List       *force_notnull = NIL;
@@ -986,6 +1018,14 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
                                 errhint("Anyone can COPY to stdout or from 
stdin. "
                                                 "psql's \\copy command also 
works for anyone.")));
 
+       /* Disallow COPY ... FROM FUNCTION (only TO FUNCTION supported) */
+       if (is_from && cstate->destination != NULL &&
+               IsA(cstate->destination, RangeVar))
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("COPY FROM does not support functions 
as sources")));
+
+
        if (stmt->relation)
        {
                Assert(!stmt->query);
@@ -1183,7 +1223,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
        cstate->encoding_embeds_ascii = 
PG_ENCODING_IS_CLIENT_ONLY(cstate->client_encoding);
 
        cstate->copy_dest = COPY_FILE;          /* default */
-       cstate->filename = stmt->filename;
+       cstate->destination = stmt->destination;
 
        if (is_from)
                CopyFrom(cstate);               /* copy from file to database */
@@ -1225,7 +1265,7 @@ DoCopy(const CopyStmt *stmt, const char *queryString)
 static void
 DoCopyTo(CopyState cstate)
 {
-       bool            pipe = (cstate->filename == NULL);
+       bool            pipe = (cstate->destination == NULL);
 
        if (cstate->rel)
        {
@@ -1257,37 +1297,128 @@ DoCopyTo(CopyState cstate)
                else
                        cstate->copy_file = stdout;
        }
-       else
+       else if (IsA(cstate->destination, String))
        {
                mode_t          oumask;         /* Pre-existing umask value */
                struct stat st;
+               char       *dest_filename = strVal(cstate->destination);
 
                /*
                 * Prevent write to relative path ... too easy to shoot oneself 
in the
                 * foot by overwriting a database file ...
                 */
-               if (!is_absolute_path(cstate->filename))
+               if (!is_absolute_path(dest_filename))
                        ereport(ERROR,
                                        (errcode(ERRCODE_INVALID_NAME),
                                         errmsg("relative path not allowed for 
COPY to file")));
 
                oumask = umask((mode_t) 022);
-               cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_W);
+               cstate->copy_file = AllocateFile(dest_filename, PG_BINARY_W);
                umask(oumask);
 
                if (cstate->copy_file == NULL)
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not open file \"%s\" for 
writing: %m",
-                                                       cstate->filename)));
+                                                       dest_filename)));
 
                fstat(fileno(cstate->copy_file), &st);
                if (S_ISDIR(st.st_mode))
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                        errmsg("\"%s\" is a directory", 
cstate->filename)));
+                                        errmsg("\"%s\" is a directory", 
dest_filename)));
        }
 
+       /* Branch taken in the "COPY ... TO FUNCTION funcname" situation */
+       else if (IsA(cstate->destination, RangeVar))
+       {
+               List                    *names;
+               FmgrInfo                *flinfo;
+               FuncDetailCode   fdresult;
+               Oid                              funcid;
+               Oid                              rettype;
+               bool                     retset;
+               int                              nvargs;
+               Oid                             *true_typeids;
+               const int                nargs          = 1;
+               Oid                              argtypes[]     = { INTERNALOID 
};
+
+               /* Flip copy-action dispatch flag */
+               cstate->copy_dest = COPY_FN;
+
+               /* Make an fcinfo that can be reused and is stored on the 
cstate. */
+               names = makeNameListFromRangeVar((RangeVar *) 
cstate->destination);
+               flinfo  = palloc0(sizeof *flinfo);
+
+
+               fdresult = func_get_detail(names, NIL, NIL, nargs, argtypes, 
false,
+                                                                  false,
+
+                                                                  /* Begin 
out-arguments */
+                                                                  &funcid, 
&rettype, &retset, &nvargs,
+                                                                  
&true_typeids, NULL);
+
+               /*
+                * Check to ensure that this is a "normal" function when looked 
up,
+                * otherwise error.
+                */
+               switch (fdresult)
+               {
+                       /* Normal function found; do nothing */
+                       case FUNCDETAIL_NORMAL:
+                               break;
+
+                       case FUNCDETAIL_NOTFOUND:
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_UNDEFINED_FUNCTION),
+                                                errmsg("function %s does not 
exist",
+                                                               
func_signature_string(names, nargs, NIL,
+                                                                               
                          argtypes))));
+                               break;
+
+                       case FUNCDETAIL_AGGREGATE:
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                                errmsg("function %s must not 
be an aggregate",
+                                                               
func_signature_string(names, nargs, NIL,
+                                                                               
                          argtypes))));
+                               break;
+
+                       case FUNCDETAIL_WINDOWFUNC:
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                                errmsg("function %s must not 
be a window function",
+                                                               
func_signature_string(names, nargs, NIL,
+                                                                               
                          argtypes))));
+                               break;
+
+                       case FUNCDETAIL_COERCION:
+                               /*
+                                * Should never be yielded from func_get_detail 
if it is passed
+                                * fargs == NIL, as it is previously.
+                                */
+                               Assert(false);
+                               break;
+
+                       case FUNCDETAIL_MULTIPLE:
+                               /*
+                                * Only support one signature, thus overloading 
of a name with
+                                * different types should never occur.
+                                */
+                               Assert(false);
+                               break;
+
+               }
+
+               fmgr_info(funcid, flinfo);
+               InitFunctionCallInfoData(cstate->output_fcinfo, flinfo,
+                                                                1, NULL, NULL);
+       }
+       else
+               /* Unexpected type was found for cstate->destination. */
+               Assert(false);
+
+
        PG_TRY();
        {
                if (cstate->fe_copy)
@@ -1310,13 +1441,13 @@ DoCopyTo(CopyState cstate)
        }
        PG_END_TRY();
 
-       if (!pipe)
+       if (!pipe && cstate->copy_dest != COPY_FN)
        {
                if (FreeFile(cstate->copy_file))
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not write to file 
\"%s\": %m",
-                                                       cstate->filename)));
+                                                       
strVal(cstate->destination))));
        }
 }
 
@@ -1342,6 +1473,13 @@ CopyTo(CopyState cstate)
        /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
        cstate->fe_msgbuf = makeStringInfo();
 
+       /*
+        * fe_msgbuf is never rebound, so there is only a need to set up the
+        * output_fcinfo once.
+        */
+       cstate->output_fcinfo.arg[0] = PointerGetDatum(cstate->fe_msgbuf);
+       cstate->output_fcinfo.argnull[0] = false;
+
        /* Get info about the columns we need to process. */
        cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * 
sizeof(FmgrInfo));
        foreach(cur, cstate->attnumlist)
@@ -1668,7 +1806,7 @@ limit_printout_length(const char *str)
 static void
 CopyFrom(CopyState cstate)
 {
-       bool            pipe = (cstate->filename == NULL);
+       bool            pipe = (cstate->destination == NULL);
        HeapTuple       tuple;
        TupleDesc       tupDesc;
        Form_pg_attribute *attr;
@@ -1768,19 +1906,21 @@ CopyFrom(CopyState cstate)
        {
                struct stat st;
 
-               cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
+               cstate->copy_file = AllocateFile(strVal(cstate->destination),
+                                                                               
 PG_BINARY_R);
 
                if (cstate->copy_file == NULL)
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not open file \"%s\" for 
reading: %m",
-                                                       cstate->filename)));
+                                                       
strVal(cstate->destination))));
 
                fstat(fileno(cstate->copy_file), &st);
                if (S_ISDIR(st.st_mode))
                        ereport(ERROR,
                                        (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                                        errmsg("\"%s\" is a directory", 
cstate->filename)));
+                                        errmsg("\"%s\" is a directory",
+                                                       
strVal(cstate->destination))));
        }
 
        tupDesc = RelationGetDescr(cstate->rel);
@@ -2215,7 +2355,7 @@ CopyFrom(CopyState cstate)
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                                         errmsg("could not read from file 
\"%s\": %m",
-                                                       cstate->filename)));
+                                                       
strVal(cstate->destination))));
        }
 
        /*
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index f045f9c..0914dc9 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -1829,7 +1829,7 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
                                {
                                        CopyStmt   *cstmt = (CopyStmt *) stmt;
 
-                                       if (cstmt->filename == NULL)
+                                       if (cstmt->destination == NULL)
                                        {
                                                my_res = SPI_ERROR_COPY;
                                                goto fail;
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index 8bc72d1..9b39abe 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2485,7 +2485,7 @@ _copyCopyStmt(CopyStmt *from)
        COPY_NODE_FIELD(query);
        COPY_NODE_FIELD(attlist);
        COPY_SCALAR_FIELD(is_from);
-       COPY_STRING_FIELD(filename);
+       COPY_NODE_FIELD(destination);
        COPY_NODE_FIELD(options);
 
        return newnode;
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index 3d65d8b..6ddf226 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1085,7 +1085,7 @@ _equalCopyStmt(CopyStmt *a, CopyStmt *b)
        COMPARE_NODE_FIELD(query);
        COMPARE_NODE_FIELD(attlist);
        COMPARE_SCALAR_FIELD(is_from);
-       COMPARE_STRING_FIELD(filename);
+       COMPARE_NODE_FIELD(destination);
        COMPARE_NODE_FIELD(options);
 
        return true;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 130e6f4..23331ee 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -251,8 +251,7 @@ static TypeName *TableFuncTypeName(List *columns);
 %type <value>  TriggerFuncArg
 %type <node>   TriggerWhen
 
-%type <str>            copy_file_name
-                               database_name access_method_clause 
access_method attr_name
+%type <str>            database_name access_method_clause access_method 
attr_name
                                index_name name cursor_name file_name 
cluster_index_specification
 
 %type <list>   func_name handler_name qual_Op qual_all_Op subquery_Op
@@ -433,6 +432,8 @@ static TypeName *TableFuncTypeName(List *columns);
 %type <ival>   opt_frame_clause frame_extent frame_bound
 
 
+%type <node>   copy_file_or_function_name
+
 /*
  * Non-keyword token types.  These are hard-wired into the "flex" lexer.
  * They must be listed first so that their numeric codes do not depend on
@@ -1977,14 +1978,15 @@ ClosePortalStmt:
  *****************************************************************************/
 
 CopyStmt:      COPY opt_binary qualified_name opt_column_list opt_oids
-                       copy_from copy_file_name copy_delimiter opt_with 
copy_options
+                       copy_from copy_file_or_function_name copy_delimiter 
opt_with
+                       copy_options
                                {
                                        CopyStmt *n = makeNode(CopyStmt);
                                        n->relation = $3;
                                        n->query = NULL;
                                        n->attlist = $4;
                                        n->is_from = $6;
-                                       n->filename = $7;
+                                       n->destination = $7;
 
                                        n->options = NIL;
                                        /* Concatenate user-supplied flags */
@@ -1998,14 +2000,15 @@ CopyStmt:       COPY opt_binary qualified_name 
opt_column_list opt_oids
                                                n->options = 
list_concat(n->options, $10);
                                        $$ = (Node *)n;
                                }
-                       | COPY select_with_parens TO copy_file_name opt_with 
copy_options
+                       | COPY select_with_parens TO copy_file_or_function_name
+                         opt_with copy_options
                                {
                                        CopyStmt *n = makeNode(CopyStmt);
                                        n->relation = NULL;
                                        n->query = $2;
                                        n->attlist = NIL;
                                        n->is_from = false;
-                                       n->filename = $4;
+                                       n->destination = $4;
                                        n->options = $6;
                                        $$ = (Node *)n;
                                }
@@ -2021,10 +2024,17 @@ copy_from:
  * used depends on the direction. (It really doesn't make sense to copy from
  * stdout. We silently correct the "typo".)             - AY 9/94
  */
-copy_file_name:
-                       Sconst                                                  
                { $$ = $1; }
-                       | STDIN                                                 
                { $$ = NULL; }
-                       | STDOUT                                                
                { $$ = NULL; }
+copy_file_or_function_name:
+                       Sconst                                                  
{ $$ = (Node *) makeString($1); }
+
+                       /*
+                        * Note that func_name is not used here because there 
is no need to
+                        * accept the "funcname(TYPES)" construction, as there 
is only one
+                        * valid signature.
+                        */
+                       | FUNCTION qualified_name               { $$ = (Node *) 
$2; }
+                       | STDIN                                                 
{ $$ = NULL; }
+                       | STDOUT                                                
{ $$ = NULL; }
                ;
 
 copy_options: copy_opt_list                                                    
{ $$ = $1; }
diff --git a/src/include/catalog/namespace.h b/src/include/catalog/namespace.h
index d356635..1d801cd 100644
--- a/src/include/catalog/namespace.h
+++ b/src/include/catalog/namespace.h
@@ -94,6 +94,7 @@ extern Oid    LookupExplicitNamespace(const char *nspname);
 
 extern Oid     LookupCreationNamespace(const char *nspname);
 extern Oid     QualifiedNameGetCreationNamespace(List *names, char 
**objname_p);
+extern List *makeNameListFromRangeVar(RangeVar *rangevar);
 extern RangeVar *makeRangeVarFromNameList(List *names);
 extern char *NameListToString(List *names);
 extern char *NameListToQuotedString(List *names);
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index b34300f..203088c 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1293,7 +1293,8 @@ typedef struct CopyStmt
        List       *attlist;            /* List of column names (as Strings), 
or NIL
                                                                 * for all 
columns */
        bool            is_from;                /* TO or FROM */
-       char       *filename;           /* filename, or NULL for STDIN/STDOUT */
+       Node       *destination;        /* filename, or NULL for STDIN/STDOUT, 
or a
+                                                                * function */
        List       *options;            /* List of DefElem nodes */
 } CopyStmt;
 
-- 
1.6.5.3


-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to