Attached is a patch that implements copy_srf(). The function signature reflects cstate more than it reflects the COPY options (filename+is_program instead of FILENAME or PROGRAM, etc)
CREATE OR REPLACE FUNCTION copy_srf( filename text DEFAULT null, is_program boolean DEFAULT false, format text DEFAULT 'text', /* accepts text, csv, binary */ delimiter text DEFAULT null, null_string text DEFAULT E'\\N', header boolean DEFAULT false, quote text DEFAULT null, escape text DEFAULT null, encoding text DEFAULT null) RETURNS SETOF RECORD The major utility for this (at least for me) will be in ETLs that currently make a lot of use of temp tables, and wish to either reduce I/O or avoid pg_attribute bloat. I have not yet implemented STDIN mode, but it's a start. $ psql test psql (10devel) Type "help" for help. # select * from copy_srf('echo 1,2; echo 4,5',true,'csv') as t(x text, y text); x | y ---+--- 1 | 2 4 | 5 (2 rows) Time: 1.456 ms # select * from copy_srf('/tmp/small_file.txt'::text,false,'text') as t(x text, y text); x | y -----+----- 1 | 2 a | b cde | fgh (3 rows) On Mon, Oct 17, 2016 at 2:33 PM, Merlin Moncure <mmonc...@gmail.com> wrote: > On Fri, Sep 30, 2016 at 9:56 PM, Tom Lane <t...@sss.pgh.pa.us> wrote: > > Craig Ringer <craig.rin...@2ndquadrant.com> writes: > >> On 1 Oct. 2016 05:20, "Tom Lane" <t...@sss.pgh.pa.us> wrote: > >>> I think the last of those suggestions has come up before. It has the > >>> large advantage that you don't have to remember a different syntax for > >>> copy-as-a-function. > > > >> That sounds fantastic. It'd help this copy variant retain festure parity > >> with normal copy. And it'd bring us closer to being able to FETCH in non > >> queries. > > > > On second thought, though, this couldn't exactly duplicate the existing > > COPY syntax, because COPY relies heavily on the rowtype of the named > > target table to tell it what it's copying. You'd need some new syntax > > to provide the list of column names and types, which puts a bit of > > a hole in the "syntax we already know" argument. A SRF-returning-record > > would have a leg up on that, because we do have existing syntax for > > defining the concrete rowtype that any particular call returns. > > One big disadvantage of SRF-returning-record syntax is that functions > are basically unwrappable with generic wrappers sans major gymnastics > such as dynamically generating the query and executing it. This is a > major disadvantage relative to the null::type hack we use in the > populate_record style functions and perhaps ought to make this > (SRF-returning-record syntax) style of use discouraged for useful > library functions. If there were a way to handle wrapping I'd > withdraw this minor objection -- this has come up in dblink > discussions a few times). > > merlin >
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index ada2142..0876ee1 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1006,6 +1006,21 @@ LANGUAGE INTERNAL STRICT IMMUTABLE PARALLEL SAFE AS 'jsonb_insert'; +CREATE OR REPLACE FUNCTION copy_srf( + IN filename text DEFAULT null, + IN is_program boolean DEFAULT false, + IN format text DEFAULT 'text', + IN delimiter text DEFAULT null, + IN null_string text DEFAULT E'\\N', + IN header boolean DEFAULT false, + IN quote text DEFAULT null, + IN escape text DEFAULT null, + IN encoding text DEFAULT null) +RETURNS SETOF RECORD +LANGUAGE INTERNAL +VOLATILE ROWS 1000 COST 1000 CALLED ON NULL INPUT +AS 'copy_srf'; + -- The default permissions for functions mean that anyone can execute them. -- A number of functions shouldn't be executable by just anyone, but rather -- than use explicit 'superuser()' checks in those functions, we use the GRANT diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index b4140eb..90ed2c5 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -30,6 +30,7 @@ #include "commands/defrem.h" #include "commands/trigger.h" #include "executor/executor.h" +#include "funcapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" @@ -555,7 +556,6 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread) errmsg("could not read from COPY file: %m"))); break; case COPY_OLD_FE: - /* * We cannot read more than minread bytes (which in practice is 1) * because old protocol doesn't have any clear way of separating @@ -4555,3 +4555,377 @@ CreateCopyDestReceiver(void) return (DestReceiver *) self; } + +Datum +copy_srf(PG_FUNCTION_ARGS) +{ + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore = NULL; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + FmgrInfo *in_functions; + Oid *typioparams; + Oid in_func_oid; + + CopyStateData copy_state; + int col; + + Datum *values; + bool *nulls; + + /* check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + } + + if (!(rsinfo->allowedModes & SFRM_Materialize) || rsinfo->expectedDesc == NULL) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not allowed in this context"))); + } + + tupdesc = CreateTupleDescCopy(rsinfo->expectedDesc); + values = (Datum *) palloc(tupdesc->natts * sizeof(Datum)); + nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); + in_functions = (FmgrInfo *) palloc(tupdesc->natts * sizeof(FmgrInfo)); + typioparams = (Oid *) palloc(tupdesc->natts * sizeof(Oid)); + + for (col = 0; col < tupdesc->natts; col++) + { + getTypeInputInfo(tupdesc->attrs[col]->atttypid,&in_func_oid,&typioparams[col]); + fmgr_info(in_func_oid,&in_functions[col]); + } + + /* + * Function signature is: + * copy_srf( filename text default null, + * is_program boolean default false, + * format text default 'text', + * delimiter text default E'\t' in text mode, ',' in csv mode, + * null_string text default '\N', + * header boolean default false, + * quote text default '"' in csv mode only, + * escape text default null, -- defaults to whatever quote is + * encoding text default null + */ + + /* Mock up a copy state */ + initStringInfo(©_state.line_buf); + initStringInfo(©_state.attribute_buf); + copy_state.fe_msgbuf = makeStringInfo(); + copy_state.oids = false; + copy_state.freeze = false; + + copy_state.need_transcoding = false; + copy_state.encoding_embeds_ascii = false; + copy_state.rel = NULL; + copy_state.queryDesc = NULL; + + /* param 0: filename */ + if (PG_ARGISNULL(0)) + { + copy_state.copy_dest = COPY_NEW_FE; + copy_state.filename = NULL; + } + else + { + copy_state.copy_dest = COPY_FILE; + copy_state.filename = TextDatumGetCString(PG_GETARG_TEXT_P(0)); + } + + /* param 1: is_program */ + if (PG_ARGISNULL(1)) + { + copy_state.is_program = false; + } + else + { + copy_state.is_program = PG_GETARG_BOOL(1); + } + + /* param 2: format - text, csv, binary */ + if (PG_ARGISNULL(2)) + { + copy_state.binary = false; + copy_state.csv_mode = false; + } + else + { + char* format_str = TextDatumGetCString(PG_GETARG_TEXT_P(2)); + if (strcmp(format_str,"text") == 0) + { + copy_state.binary = false; + copy_state.csv_mode = false; + } + else if (strcmp(format_str,"csv") == 0) + { + copy_state.binary = false; + copy_state.csv_mode = true; + } + else if (strcmp(format_str,"binary") == 0) + { + copy_state.binary = true; + copy_state.csv_mode = false; + } + else + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("Format must be one of: text csv binary"))); + } + } + + /* param 3: delimiter text default E'\t', */ + if (PG_ARGISNULL(3)) + { + copy_state.delim = copy_state.csv_mode ? "," : "\t"; + } + else + { + if (copy_state.binary) + { + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify DELIMITER in BINARY mode"))); + } + copy_state.delim = TextDatumGetCString(PG_GETARG_TEXT_P(3)); + + if (strlen(copy_state.delim) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY delimiter must be a single one-byte character"))); + } + + /* Disallow end-of-line characters */ + if (strchr(copy_state.delim, '\r') != NULL || + strchr(copy_state.delim, '\n') != NULL) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY delimiter cannot be newline or carriage return"))); + } + } + + /* param 4: null_string text default '\N', */ + if (PG_ARGISNULL(4)) + { + copy_state.null_print = copy_state.csv_mode ? "" : "\\N"; + } + else + { + copy_state.null_print = TextDatumGetCString(PG_GETARG_TEXT_P(4)); + } + copy_state.null_print_len = strlen(copy_state.null_print); + /* NOT SET char *null_print_client; */ + + /* param 5: header boolean default false, */ + if (PG_ARGISNULL(5)) + { + copy_state.header_line = false; + } + else + { + copy_state.header_line = PG_GETARG_BOOL(5); + } + + /* param 6: quote text default '"', */ + if (PG_ARGISNULL(6)) + { + copy_state.quote = "\""; + } + else + { + if (copy_state.csv_mode) + { + if (strlen(copy_state.quote) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY quote must be a single one-byte character"))); + } + + if (copy_state.delim[0] == copy_state.quote[0]) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("COPY delimiter and quote must be different"))); + } + } + else + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY quote available only in CSV mode"))); + } + copy_state.quote = TextDatumGetCString(PG_GETARG_TEXT_P(6)); + } + + /* param 7: escape text default null, -- defaults to whatever quote is */ + if (PG_ARGISNULL(7)) + { + copy_state.escape = copy_state.quote; + } + else + { + if (copy_state.csv_mode) + { + copy_state.escape = TextDatumGetCString(PG_GETARG_TEXT_P(7)); + if (strlen(copy_state.escape) != 1) + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY escape must be a single one-byte character"))); + } + } + else + { + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("COPY escape available only in CSV mode"))); + } + } + + /* param 8: encoding text default null */ + if (PG_ARGISNULL(8)) + { + copy_state.file_encoding = pg_get_client_encoding(); + } + else + { + char* encoding = TextDatumGetCString(PG_GETARG_TEXT_P(8)); + copy_state.file_encoding = pg_char_to_encoding(encoding); + if (copy_state.file_encoding < 0) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be a valid encoding name", + encoding))); + } + } + + copy_state.max_fields = tupdesc->natts; + copy_state.raw_fields = (char **) palloc(tupdesc->natts * sizeof(char *)); + + /* let the caller know we're sending back a tuplestore */ + rsinfo->returnMode = SFRM_Materialize; + per_query_ctx = fcinfo->flinfo->fn_mcxt; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true,false,work_mem); + + /* open "file" */ + if (copy_state.is_program) + { + copy_state.copy_file = OpenPipeStream(copy_state.filename, PG_BINARY_R); + + if (copy_state.copy_file == NULL) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not execute command \"%s\": %m", + copy_state.filename))); + } + } + else + { + struct stat st; + + copy_state.copy_file = AllocateFile(copy_state.filename, PG_BINARY_R); + if (copy_state.copy_file == NULL) + { + /* copy errno because ereport subfunctions might change it */ + int save_errno = errno; + + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\" for reading: %m", + copy_state.filename), + (save_errno == ENOENT || save_errno == EACCES) ? + errhint("copy_srf instructs the PostgreSQL server process to read a file. " + "You may want a client-side facility such as psql's \\copy.") : 0)); + } + + if (fstat(fileno(copy_state.copy_file), &st)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not stat file \"%s\": %m", + copy_state.filename))); + + if (S_ISDIR(st.st_mode)) + ereport(ERROR, + (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("\"%s\" is a directory", copy_state.filename))); + } + + while(1) + { + char **field_strings; + int field_strings_count; + int col; + HeapTuple tuple; + + if (! NextCopyFromRawFields(©_state,&field_strings,&field_strings_count)) + { + break; + } + if (field_strings_count != tupdesc->natts) + { + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("found %d fields but expected %d on line %d", + field_strings_count, tupdesc->natts, copy_state.cur_lineno))); + } + + for (col = 0; col < tupdesc->natts; col++) + { + values[col] = InputFunctionCall(&in_functions[col], + field_strings[col], + typioparams[col], + tupdesc->attrs[col]->atttypmod); + nulls[col] = (field_strings[col] == NULL); + } + + tuple = heap_form_tuple(tupdesc,values,nulls); + //tuple = BuildTupleFromCStrings(attinmeta, field_strings); + tuplestore_puttuple(tupstore, tuple); + } + + /* close "file" */ + if (copy_state.is_program) + { + int pclose_rc; + + pclose_rc = ClosePipeStream(copy_state.copy_file); + if (pclose_rc == -1) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close pipe to external command: %m"))); + else if (pclose_rc != 0) + ereport(ERROR, + (errcode(ERRCODE_EXTERNAL_ROUTINE_EXCEPTION), + errmsg("program \"%s\" failed", + copy_state.filename), + errdetail_internal("%s", wait_result_to_str(pclose_rc)))); + } + else + { + if (copy_state.filename != NULL && FreeFile(copy_state.copy_file)) + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not close file \"%s\": %m", + copy_state.filename))); + } + + tuplestore_donestoring(tupstore); + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + return (Datum) 0; +} diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 17ec71d..d8076ee 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -5341,6 +5341,11 @@ DESCR("pg_controldata recovery state information as a function"); DATA(insert OID = 3444 ( pg_control_init PGNSP PGUID 12 1 0 0 0 f f f f t f v s 0 0 2249 "" "{23,23,23,23,23,23,23,23,23,16,16,16,23}" "{o,o,o,o,o,o,o,o,o,o,o,o,o}" "{max_data_alignment,database_block_size,blocks_per_segment,wal_block_size,bytes_per_wal_segment,max_identifier_length,max_index_columns,max_toast_chunk_size,large_object_chunk_size,bigint_timestamps,float4_pass_by_value,float8_pass_by_value,data_page_checksum_version}" _null_ _null_ pg_control_init _null_ _null_ _null_ )); DESCR("pg_controldata init state information as a function"); + +DATA(insert OID = 3445 ( copy_srf PGNSP PGUID 12 1 0 0 0 f f f f f t v u 9 0 2249 "25 16 25 25 25 16 25 25 25" _null_ _null_ _null_ _null_ _null_ copy_srf _null_ _null_ _null_ )); +DESCR("set-returning COPY proof of concept"); + + /* * Symbolic values for provolatile column: these indicate whether the result * of a function is dependent *only* on the values of its explicit arguments, diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index 65eb347..c27baac 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -37,4 +37,7 @@ extern void CopyFromErrorCallback(void *arg); extern DestReceiver *CreateCopyDestReceiver(void); +extern Datum copy_srf(PG_FUNCTION_ARGS); + + #endif /* COPY_H */
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers