On Wed, Jan 25, 2017 at 06:16:16AM -0800, David Fetter wrote:
> On Mon, Oct 31, 2016 at 04:45:40PM -0400, Corey Huinker wrote:
> > Attached is a patch that implements copy_srf().
> > 
> > The function signature reflects cstate more than it reflects the COPY
> > options (filename+is_program instead of  FILENAME or PROGRAM, etc)
> 
> The patch as it stands needs a rebase.  I'll see what I can do today.

Please find attached a rebase, which fixes an OID collision that crept in.

- The patch builds against master and passes "make check".
- The patch does not contain user-visible documentation or examples.
- The patch does not contain tests.

I got the following when I did a brief test.

SELECT * FROM copy_srf(filename => 'ls', is_program => true) AS t(l text);
server closed the connection unexpectedly
        This probably means the server terminated abnormally
        before or while processing the request.
The connection to the server was lost. Attempting reset: Failed.

Best,
David.
-- 
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778  AIM: dfetter666  Yahoo!: dfetter
Skype: davidfetter      XMPP: david(dot)fetter(at)gmail(dot)com

Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 4dfedf8..ae07cfb 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1065,6 +1065,21 @@ LANGUAGE INTERNAL
 STRICT IMMUTABLE PARALLEL SAFE
 AS 'jsonb_insert';
 
+CREATE OR REPLACE FUNCTION copy_srf(
+       IN filename text DEFAULT null,
+       IN is_program boolean DEFAULT false,
+       IN format text DEFAULT 'text',
+       IN delimiter text DEFAULT null,
+       IN null_string text DEFAULT E'\\N',
+       IN header boolean DEFAULT false,
+       IN quote text DEFAULT null,
+       IN escape text DEFAULT null,
+       IN encoding text DEFAULT null)
+RETURNS SETOF RECORD
+LANGUAGE INTERNAL
+VOLATILE ROWS 1000 COST 1000 CALLED ON NULL INPUT
+AS 'copy_srf';
+
 -- The default permissions for functions mean that anyone can execute them.
 -- A number of functions shouldn't be executable by just anyone, but rather
 -- than use explicit 'superuser()' checks in those functions, we use the GRANT
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index f9362be..8e1bd39 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -30,6 +30,7 @@
 #include "commands/defrem.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
+#include "funcapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
@@ -562,7 +563,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, 
int maxread)
                                                 errmsg("could not read from 
COPY file: %m")));
                        break;
                case COPY_OLD_FE:
-
                        /*
                         * We cannot read more than minread bytes (which in 
practice is 1)
                         * because old protocol doesn't have any clear way of 
separating
@@ -4740,3 +4740,377 @@ CreateCopyDestReceiver(void)
 
        return (DestReceiver *) self;
 }
+
+Datum
+copy_srf(PG_FUNCTION_ARGS)
+{
+       ReturnSetInfo   *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+       TupleDesc               tupdesc;
+       Tuplestorestate *tupstore = NULL;
+       MemoryContext   per_query_ctx;
+       MemoryContext   oldcontext;
+       FmgrInfo        *in_functions;
+       Oid             *typioparams;
+       Oid             in_func_oid;
+
+       CopyStateData   copy_state;
+       int                     col;
+
+       Datum       *values;
+       bool            *nulls;
+
+       /* check to see if caller supports us returning a tuplestore */
+       if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+       {
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("set-valued function called in context 
that cannot accept a set")));
+       }
+
+       if (!(rsinfo->allowedModes & SFRM_Materialize) || rsinfo->expectedDesc 
== NULL)
+       {
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("materialize mode required, but it is 
not allowed in this context")));
+       }
+
+       tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc);
+       values = (Datum *) palloc(tupdesc->natts * sizeof(Datum));
+       nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
+       in_functions = (FmgrInfo *) palloc(tupdesc->natts * sizeof(FmgrInfo));
+       typioparams = (Oid *) palloc(tupdesc->natts * sizeof(Oid));
+
+       for (col = 0; col < tupdesc->natts; col++)
+       {
+               
getTypeInputInfo(tupdesc->attrs[col]->atttypid,&in_func_oid,&typioparams[col]);
+               fmgr_info(in_func_oid,&in_functions[col]);
+       }
+
+       /*
+        * Function signature is:
+        * copy_srf( filename text default null,
+        *           is_program boolean default false,
+        *           format text default 'text',
+        *           delimiter text default E'\t' in text mode, ',' in csv mode,
+        *           null_string text default '\N',
+        *           header boolean default false,
+        *           quote text default '"' in csv mode only,
+        *           escape text default null, -- defaults to whatever quote is
+        *           encoding text default null
+        */
+
+       /* Mock up a copy state */
+       initStringInfo(&copy_state.line_buf);
+       initStringInfo(&copy_state.attribute_buf);
+       copy_state.fe_msgbuf = makeStringInfo();
+       copy_state.oids = false;
+       copy_state.freeze = false;
+
+       copy_state.need_transcoding = false;
+       copy_state.encoding_embeds_ascii = false;
+       copy_state.rel = NULL;
+       copy_state.queryDesc = NULL;
+
+       /* param 0: filename */
+       if (PG_ARGISNULL(0))
+       {
+               copy_state.copy_dest = COPY_NEW_FE;
+               copy_state.filename = NULL;
+       }
+       else
+       {
+               copy_state.copy_dest = COPY_FILE;
+               copy_state.filename = TextDatumGetCString(PG_GETARG_TEXT_P(0));
+       }
+
+       /* param 1: is_program */
+       if (PG_ARGISNULL(1))
+       {
+               copy_state.is_program = false;
+       }
+       else
+       {
+               copy_state.is_program = PG_GETARG_BOOL(1);
+       }
+
+       /* param 2: format - text, csv, binary */
+       if (PG_ARGISNULL(2))
+       {
+               copy_state.binary = false;
+               copy_state.csv_mode = false;
+       }
+       else
+       {
+               char* format_str = TextDatumGetCString(PG_GETARG_TEXT_P(2));
+               if (strcmp(format_str,"text") == 0)
+               {
+                       copy_state.binary = false;
+                       copy_state.csv_mode = false;
+               }
+               else if (strcmp(format_str,"csv") == 0)
+               {
+                       copy_state.binary = false;
+                       copy_state.csv_mode = true;
+               }
+               else if (strcmp(format_str,"binary") == 0)
+               {
+                       copy_state.binary = true;
+                       copy_state.csv_mode = false;
+               }
+               else
+               {
+                       ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("Format must be one of: text csv 
binary")));
+               }
+       }
+
+       /* param 3: delimiter text default E'\t', */
+       if (PG_ARGISNULL(3))
+       {
+               copy_state.delim = copy_state.csv_mode ? "," : "\t";
+       }
+       else
+       {
+               if (copy_state.binary)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("cannot specify DELIMITER in 
BINARY mode")));
+               }
+               copy_state.delim = TextDatumGetCString(PG_GETARG_TEXT_P(3));
+
+               if (strlen(copy_state.delim) != 1)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                 errmsg("COPY delimiter must be a single 
one-byte character")));
+               }
+
+               /* Disallow end-of-line characters */
+               if (strchr(copy_state.delim, '\r') != NULL ||
+                       strchr(copy_state.delim, '\n') != NULL)
+               {
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                errmsg("COPY delimiter cannot be newline or 
carriage return")));
+               }
+       }
+
+       /* param 4: null_string text default '\N', */
+       if (PG_ARGISNULL(4))
+       {
+               copy_state.null_print = copy_state.csv_mode ? "" : "\\N";
+       }
+       else
+       {
+               copy_state.null_print = 
TextDatumGetCString(PG_GETARG_TEXT_P(4));
+       }
+       copy_state.null_print_len = strlen(copy_state.null_print);
+       /* NOT SET char    *null_print_client; */
+
+       /* param 5: header boolean default false, */
+       if (PG_ARGISNULL(5))
+       {
+               copy_state.header_line = false;
+       }
+       else
+       {
+               copy_state.header_line = PG_GETARG_BOOL(5);
+       }
+
+       /* param 6: quote text default '"', */
+       if (PG_ARGISNULL(6))
+       {
+               copy_state.quote = "\"";
+       }
+       else
+       {
+               if (copy_state.csv_mode)
+               {
+                       if (strlen(copy_state.quote) != 1)
+                       {
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("COPY quote must be a 
single one-byte character")));
+                       }
+
+                       if (copy_state.delim[0] == copy_state.quote[0])
+                       {
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                                errmsg("COPY delimiter and 
quote must be different")));
+                       }
+               }
+               else
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("COPY quote available only in 
CSV mode")));
+               }
+               copy_state.quote = TextDatumGetCString(PG_GETARG_TEXT_P(6));
+       }
+
+       /* param 7: escape text default null, -- defaults to whatever quote is 
*/
+       if (PG_ARGISNULL(7))
+       {
+               copy_state.escape = copy_state.quote;
+       }
+       else
+       {
+               if (copy_state.csv_mode)
+               {
+                       copy_state.escape = 
TextDatumGetCString(PG_GETARG_TEXT_P(7));
+                       if (strlen(copy_state.escape) != 1)
+                       {
+                               ereport(ERROR,
+                                               
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("COPY escape must be a 
single one-byte character")));
+                       }
+               }
+               else
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("COPY escape available only in 
CSV mode")));
+               }
+       }
+
+       /* param 8: encoding text default null */
+       if (PG_ARGISNULL(8))
+       {
+               copy_state.file_encoding = pg_get_client_encoding();
+       }
+       else
+       {
+               char* encoding = TextDatumGetCString(PG_GETARG_TEXT_P(8));
+               copy_state.file_encoding = pg_char_to_encoding(encoding);
+               if (copy_state.file_encoding < 0)
+               {
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                                        errmsg("argument to option \"%s\" must 
be a valid encoding name",
+                                                       encoding)));
+               }
+       }
+
+       copy_state.max_fields = tupdesc->natts;
+       copy_state.raw_fields = (char **) palloc(tupdesc->natts * sizeof(char 
*));
+
+       /* let the caller know we're sending back a tuplestore */
+       rsinfo->returnMode = SFRM_Materialize;
+       per_query_ctx = fcinfo->flinfo->fn_mcxt;
+       oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+       tupstore = tuplestore_begin_heap(true,false,work_mem);
+
+       /* open "file" */
+       if (copy_state.is_program)
+       {
+               copy_state.copy_file = OpenPipeStream(copy_state.filename, 
PG_BINARY_R);
+
+               if (copy_state.copy_file == NULL)
+               {
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not execute command 
\"%s\": %m",
+                                                       copy_state.filename)));
+               }
+       }
+       else
+       {
+               struct stat st;
+
+               copy_state.copy_file = AllocateFile(copy_state.filename, 
PG_BINARY_R);
+               if (copy_state.copy_file == NULL)
+               {
+                       /* copy errno because ereport subfunctions might change 
it */
+                       int         save_errno = errno;
+
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not open file \"%s\" for 
reading: %m",
+                                                       copy_state.filename),
+                                        (save_errno == ENOENT || save_errno == 
EACCES) ?
+                                        errhint("copy_srf instructs the 
PostgreSQL server process to read a file. "
+                                                        "You may want a 
client-side facility such as psql's \\copy.") : 0));
+               }
+
+               if (fstat(fileno(copy_state.copy_file), &st))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not stat file \"%s\": 
%m",
+                                                       copy_state.filename)));
+
+               if (S_ISDIR(st.st_mode))
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                                        errmsg("\"%s\" is a directory", 
copy_state.filename)));
+       }
+
+       while(1)
+       {
+               char    **field_strings;
+               int     field_strings_count;
+               int     col;
+               HeapTuple tuple;
+
+               if (! 
NextCopyFromRawFields(&copy_state,&field_strings,&field_strings_count))
+               {
+                       break;
+               }
+               if (field_strings_count != tupdesc->natts)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+                                        errmsg("found %d fields but expected 
%d on line %d",
+                                                       field_strings_count, 
tupdesc->natts, copy_state.cur_lineno)));
+               }
+
+               for (col = 0; col < tupdesc->natts; col++)
+               {
+                       values[col] = InputFunctionCall(&in_functions[col],
+                                                                               
        field_strings[col],
+                                                                               
        typioparams[col],
+                                                                               
        tupdesc->attrs[col]->atttypmod);
+                       nulls[col] = (field_strings[col] == NULL);
+               }
+
+               tuple = heap_form_tuple(tupdesc,values,nulls);
+               //tuple = BuildTupleFromCStrings(attinmeta, field_strings);
+               tuplestore_puttuple(tupstore, tuple);
+       }
+
+       /* close "file" */
+       if (copy_state.is_program)
+       {
+               int         pclose_rc;
+
+               pclose_rc = ClosePipeStream(copy_state.copy_file);
+               if (pclose_rc == -1)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not close pipe to 
external command: %m")));
+               else if (pclose_rc != 0)
+                       ereport(ERROR,
+                                       
(errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION),
+                                        errmsg("program \"%s\" failed",
+                                                       copy_state.filename),
+                                        errdetail_internal("%s", 
wait_result_to_str(pclose_rc))));
+       }
+       else
+       {
+               if (copy_state.filename != NULL && 
FreeFile(copy_state.copy_file))
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not close file \"%s\": 
%m",
+                                                       copy_state.filename)));
+       }
+
+       tuplestore_donestoring(tupstore);
+       rsinfo->setResult = tupstore;
+       rsinfo->setDesc = tupdesc;
+       MemoryContextSwitchTo(oldcontext);
+
+       return (Datum) 0;
+}
diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h
index c1f492b..9fb2ff7 100644
--- a/src/include/catalog/pg_proc.h
+++ b/src/include/catalog/pg_proc.h
@@ -5359,6 +5359,10 @@ DESCR("pg_controldata init state information as a 
function");
 DATA(insert OID = 3445 ( pg_import_system_collations PGNSP PGUID 12 100 0 0 0 
f f f f t f v r 2 0 2278 "16 4089" _null_ _null_ "{if_not_exists,schema}" 
_null_ _null_ pg_import_system_collations _null_ _null_ _null_ ));
 DESCR("import collations from operating system");
 
+DATA(insert OID = 3353 (  copy_srf PGNSP PGUID 12 1 0 0 0 f f f f f t v u 9 0 
2249 "25 16 25 25 25 16 25 25 25" _null_ _null_ _null_ _null_ _null_ copy_srf 
_null_ _null_ _null_ ));
+DESCR("set-returning COPY proof of concept");
+
+
 /*
  * Symbolic values for provolatile column: these indicate whether the result
  * of a function is dependent *only* on the values of its explicit arguments,
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index d63ca0f..de09841 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -38,4 +38,7 @@ extern void CopyFromErrorCallback(void *arg);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern Datum copy_srf(PG_FUNCTION_ARGS);
+
+
 #endif   /* COPY_H */
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to