Hi, I want to work on making COPY format extendable. I attach the first patch for it. I'll send more patches after this is merged.
Background: Currently, COPY TO/FROM supports only "text", "csv" and "binary" formats. There are some requests to support more COPY formats. For example: * 2023-11: JSON and JSON lines [1] * 2022-04: Apache Arrow [2] * 2018-02: Apache Avro, Apache Parquet and Apache ORC [3] (FYI: I want to add support for Apache Arrow.) There were discussions how to add support for more formats. [3][4] In these discussions, we got a consensus about making COPY format extendable. But it seems that nobody works on this yet. So I want to work on this. (If there is anyone who wants to work on this together, I'm happy.) Summary: The attached patch introduces CopyToFormatOps struct that is similar to TupleTableSlotOps for TupleTableSlot but CopyToFormatOps is for COPY TO format. CopyToFormatOps has routines to implement a COPY TO format. The attached patch doesn't change: * the current behavior (all existing tests are still passed without changing them) * the existing "text", "csv" and "binary" format output implementations including local variable names (the attached patch just move them and adjust indent) * performance (no significant loss of performance) In other words, this is just a refactoring for further changes to make COPY format extendable. If I use "complete the task and then request reviews for it" approach, it will be difficult to review because changes for it will be large. So I want to work on this step by step. Is it acceptable? TODOs that should be done in subsequent patches: * Add some CopyToState readers such as CopyToStateGetDest(), CopyToStateGetAttnums() and CopyToStateGetOpts() (We will need to consider which APIs should be exported.) (This is for implemeing COPY TO format by extension.) * Export CopySend*() in src/backend/commands/copyto.c (This is for implemeing COPY TO format by extension.) * Add API to register a new COPY TO format implementation * Add "CREATE XXX" to register a new COPY TO format (or COPY TO/FROM format) implementation ("CREATE COPY HANDLER" was suggested in [5].) * Same for COPY FROM Performance: We got a consensus about making COPY format extendable but we should care about performance. [6] > I think that step 1 ought to be to convert the existing > formats into plug-ins, and demonstrate that there's no > significant loss of performance. So I measured COPY TO time with/without this change. You can see there is no significant loss of performance. Data: Random 32 bit integers: CREATE TABLE data (int32 integer); INSERT INTO data SELECT random() * 10000 FROM generate_series(1, ${n_records}); The number of records: 100K, 1M and 10M 100K without this change: format,elapsed time (ms) text,22.527 csv,23.822 binary,24.806 100K with this change: format,elapsed time (ms) text,22.919 csv,24.643 binary,24.705 1M without this change: format,elapsed time (ms) text,223.457 csv,233.583 binary,242.687 1M with this change: format,elapsed time (ms) text,224.591 csv,233.964 binary,247.164 10M without this change: format,elapsed time (ms) text,2330.383 csv,2411.394 binary,2590.817 10M with this change: format,elapsed time (ms) text,2231.307 csv,2408.067 binary,2473.617 [1]: https://www.postgresql.org/message-id/flat/24e3ee88-ec1e-421b-89ae-8a47ee0d2df1%40joeconway.com#a5e6b8829f9a74dfc835f6f29f2e44c5 [2]: https://www.postgresql.org/message-id/flat/CAGrfaBVyfm0wPzXVqm0%3Dh5uArYh9N_ij%2BsVpUtDHqkB%3DVyB3jw%40mail.gmail.com [3]: https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com [4]: https://www.postgresql.org/message-id/flat/3741749.1655952719%40sss.pgh.pa.us#2bb7af4a3d2c7669f9a49808d777a20d [5]: https://www.postgresql.org/message-id/20180211211235.5x3jywe5z3lkgcsr%40alap3.anarazel.de [6]: https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us Thanks, -- kou
>From 7f00b2b0fb878ae1c687c151dd751512d02ed83e Mon Sep 17 00:00:00 2001 From: Sutou Kouhei <k...@clear-code.com> Date: Mon, 4 Dec 2023 12:32:54 +0900 Subject: [PATCH v1] Extract COPY TO format implementations This is a part of making COPY format extendable. See also these past discussions: * New Copy Formats - avro/orc/parquet: https://www.postgresql.org/message-id/flat/20180210151304.fonjztsynewldfba%40gmail.com * Make COPY extendable in order to support Parquet and other formats: https://www.postgresql.org/message-id/flat/CAJ7c6TM6Bz1c3F04Cy6%2BSzuWfKmr0kU8c_3Stnvh_8BR0D6k8Q%40mail.gmail.com This doesn't change the current behavior. This just introduces CopyToFormatOps, which just has function pointers of format implementation like TupleTableSlotOps, and use it for existing "text", "csv" and "binary" format implementations. Note that CopyToFormatOps can't be used from extensions yet because CopySend*() aren't exported yet. Extensions can't send formatted data to a destination without CopySend*(). They will be exported by subsequent patches. Here is a benchmark result with/without this change because there was a discussion that we should care about performance regression: https://www.postgresql.org/message-id/3741749.1655952719%40sss.pgh.pa.us > I think that step 1 ought to be to convert the existing formats into > plug-ins, and demonstrate that there's no significant loss of > performance. You can see that there is no significant loss of performance: Data: Random 32 bit integers: CREATE TABLE data (int32 integer); INSERT INTO data SELECT random() * 10000 FROM generate_series(1, ${n_records}); The number of records: 100K, 1M and 10M 100K without this change: format,elapsed time (ms) text,22.527 csv,23.822 binary,24.806 100K with this change: format,elapsed time (ms) text,22.919 csv,24.643 binary,24.705 1M without this change: format,elapsed time (ms) text,223.457 csv,233.583 binary,242.687 1M with this change: format,elapsed time (ms) text,224.591 csv,233.964 binary,247.164 10M without this change: format,elapsed time (ms) text,2330.383 csv,2411.394 binary,2590.817 10M with this change: format,elapsed time (ms) text,2231.307 csv,2408.067 binary,2473.617 --- src/backend/commands/copy.c | 8 + src/backend/commands/copyto.c | 387 +++++++++++++++++++++------------- src/include/commands/copy.h | 27 ++- 3 files changed, 266 insertions(+), 156 deletions(-) diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cfad47b562..27a1add456 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -427,6 +427,8 @@ ProcessCopyOptions(ParseState *pstate, opts_out->file_encoding = -1; + /* Text is the default format. */ + opts_out->to_ops = CopyToFormatOpsText; /* Extract options from the statement node tree */ foreach(option, options) { @@ -442,9 +444,15 @@ ProcessCopyOptions(ParseState *pstate, if (strcmp(fmt, "text") == 0) /* default format */ ; else if (strcmp(fmt, "csv") == 0) + { opts_out->csv_mode = true; + opts_out->to_ops = CopyToFormatOpsCSV; + } else if (strcmp(fmt, "binary") == 0) + { opts_out->binary = true; + opts_out->to_ops = CopyToFormatOpsBinary; + } else ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index c66a047c4a..295e96dbc5 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -131,6 +131,238 @@ static void CopySendEndOfRow(CopyToState cstate); static void CopySendInt32(CopyToState cstate, int32 val); static void CopySendInt16(CopyToState cstate, int16 val); +/* + * CopyToFormatOps implementations. + */ + +/* + * CopyToFormatOps implementation for "text" and "csv". CopyToFormatText*() + * refer cstate->opts.csv_mode and change their behavior. We can split this + * implementation and stop referring cstate->opts.csv_mode later. + */ + +static void +CopyToFormatTextSendEndOfRow(CopyToState cstate) +{ + switch (cstate->copy_dest) + { + case COPY_FILE: + /* Default line termination depends on platform */ +#ifndef WIN32 + CopySendChar(cstate, '\n'); +#else + CopySendString(cstate, "\r\n"); +#endif + break; + case COPY_FRONTEND: + /* The FE/BE protocol uses \n as newline for all platforms */ + CopySendChar(cstate, '\n'); + break; + default: + break; + } + CopySendEndOfRow(cstate); +} + +static void +CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + /* + * For non-binary copy, we need to convert null_print to file + * encoding, because it will be sent directly with CopySendString. + */ + if (cstate->need_transcoding) + cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, + cstate->opts.null_print_len, + cstate->file_encoding); + + /* if a header has been requested send the line */ + if (cstate->opts.header_line) + { + bool hdr_delim = false; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + char *colname; + + if (hdr_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + hdr_delim = true; + + colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); + + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, colname, false, + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, colname); + } + + CopyToFormatTextSendEndOfRow(cstate); + } +} + +static void +CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + bool need_delim = false; + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (need_delim) + CopySendChar(cstate, cstate->opts.delim[0]); + need_delim = true; + + if (isnull) + { + CopySendString(cstate, cstate->opts.null_print_client); + } + else + { + char *string; + + string = OutputFunctionCall(&out_functions[attnum - 1], value); + if (cstate->opts.csv_mode) + CopyAttributeOutCSV(cstate, string, + cstate->opts.force_quote_flags[attnum - 1], + list_length(cstate->attnumlist) == 1); + else + CopyAttributeOutText(cstate, string); + } + } + + CopyToFormatTextSendEndOfRow(cstate); +} + +static void +CopyToFormatTextEnd(CopyToState cstate) +{ +} + +/* + * CopyToFormatOps implementation for "binary". + */ + +static void +CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc) +{ + int num_phys_attrs; + ListCell *cur; + + num_phys_attrs = tupDesc->natts; + /* Get info about the columns we need to process. */ + cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Oid out_func_oid; + bool isvarlena; + Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); + + getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena); + fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); + } + + { + /* Generate header for a binary copy */ + int32 tmp; + + /* Signature */ + CopySendData(cstate, BinarySignature, 11); + /* Flags field */ + tmp = 0; + CopySendInt32(cstate, tmp); + /* No header extension */ + tmp = 0; + CopySendInt32(cstate, tmp); + } +} + +static void +CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot) +{ + FmgrInfo *out_functions = cstate->out_functions; + ListCell *cur; + + /* Binary per-tuple header */ + CopySendInt16(cstate, list_length(cstate->attnumlist)); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + Datum value = slot->tts_values[attnum - 1]; + bool isnull = slot->tts_isnull[attnum - 1]; + + if (isnull) + { + CopySendInt32(cstate, -1); + } + else + { + bytea *outputbytes; + + outputbytes = SendFunctionCall(&out_functions[attnum - 1], value); + CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); + CopySendData(cstate, VARDATA(outputbytes), + VARSIZE(outputbytes) - VARHDRSZ); + } + } + + CopySendEndOfRow(cstate); +} + +static void +CopyToFormatBinaryEnd(CopyToState cstate) +{ + /* Generate trailer for a binary copy */ + CopySendInt16(cstate, -1); + /* Need to flush out the trailer */ + CopySendEndOfRow(cstate); +} + +const CopyToFormatOps CopyToFormatOpsText = { + .start = CopyToFormatTextStart, + .one_row = CopyToFormatTextOneRow, + .end = CopyToFormatTextEnd, +}; + +/* + * We can use the same CopyToFormatOps for both of "text" and "csv" because + * CopyToFormatText*() refer cstate->opts.csv_mode and change their + * behavior. We can split the implementations and stop referring + * cstate->opts.csv_mode later. + */ +const CopyToFormatOps CopyToFormatOpsCSV = CopyToFormatOpsText; + +const CopyToFormatOps CopyToFormatOpsBinary = { + .start = CopyToFormatBinaryStart, + .one_row = CopyToFormatBinaryOneRow, + .end = CopyToFormatBinaryEnd, +}; /* * Send copy start/stop messages for frontend copies. These have changed @@ -198,16 +430,6 @@ CopySendEndOfRow(CopyToState cstate) switch (cstate->copy_dest) { case COPY_FILE: - if (!cstate->opts.binary) - { - /* Default line termination depends on platform */ -#ifndef WIN32 - CopySendChar(cstate, '\n'); -#else - CopySendString(cstate, "\r\n"); -#endif - } - if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1, cstate->copy_file) != 1 || ferror(cstate->copy_file)) @@ -242,10 +464,6 @@ CopySendEndOfRow(CopyToState cstate) } break; case COPY_FRONTEND: - /* The FE/BE protocol uses \n as newline for all platforms */ - if (!cstate->opts.binary) - CopySendChar(cstate, '\n'); - /* Dump the accumulated row as one CopyData message */ (void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len); break; @@ -748,8 +966,6 @@ DoCopyTo(CopyToState cstate) bool pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL); bool fe_copy = (pipe && whereToSendOutput == DestRemote); TupleDesc tupDesc; - int num_phys_attrs; - ListCell *cur; uint64 processed; if (fe_copy) @@ -759,32 +975,11 @@ DoCopyTo(CopyToState cstate) tupDesc = RelationGetDescr(cstate->rel); else tupDesc = cstate->queryDesc->tupDesc; - num_phys_attrs = tupDesc->natts; cstate->opts.null_print_client = cstate->opts.null_print; /* default */ /* We use fe_msgbuf as a per-row buffer regardless of copy_dest */ cstate->fe_msgbuf = makeStringInfo(); - /* Get info about the columns we need to process. */ - cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo)); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Oid out_func_oid; - bool isvarlena; - Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1); - - if (cstate->opts.binary) - getTypeBinaryOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - else - getTypeOutputInfo(attr->atttypid, - &out_func_oid, - &isvarlena); - fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]); - } - /* * Create a temporary memory context that we can reset once per row to * recover palloc'd memory. This avoids any problems with leaks inside @@ -795,57 +990,7 @@ DoCopyTo(CopyToState cstate) "COPY TO", ALLOCSET_DEFAULT_SIZES); - if (cstate->opts.binary) - { - /* Generate header for a binary copy */ - int32 tmp; - - /* Signature */ - CopySendData(cstate, BinarySignature, 11); - /* Flags field */ - tmp = 0; - CopySendInt32(cstate, tmp); - /* No header extension */ - tmp = 0; - CopySendInt32(cstate, tmp); - } - else - { - /* - * For non-binary copy, we need to convert null_print to file - * encoding, because it will be sent directly with CopySendString. - */ - if (cstate->need_transcoding) - cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print, - cstate->opts.null_print_len, - cstate->file_encoding); - - /* if a header has been requested send the line */ - if (cstate->opts.header_line) - { - bool hdr_delim = false; - - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - char *colname; - - if (hdr_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - hdr_delim = true; - - colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname); - - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, colname, false, - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, colname); - } - - CopySendEndOfRow(cstate); - } - } + cstate->opts.to_ops.start(cstate, tupDesc); if (cstate->rel) { @@ -884,13 +1029,7 @@ DoCopyTo(CopyToState cstate) processed = ((DR_copy *) cstate->queryDesc->dest)->processed; } - if (cstate->opts.binary) - { - /* Generate trailer for a binary copy */ - CopySendInt16(cstate, -1); - /* Need to flush out the trailer */ - CopySendEndOfRow(cstate); - } + cstate->opts.to_ops.end(cstate); MemoryContextDelete(cstate->rowcontext); @@ -906,71 +1045,15 @@ DoCopyTo(CopyToState cstate) static void CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot) { - bool need_delim = false; - FmgrInfo *out_functions = cstate->out_functions; MemoryContext oldcontext; - ListCell *cur; - char *string; MemoryContextReset(cstate->rowcontext); oldcontext = MemoryContextSwitchTo(cstate->rowcontext); - if (cstate->opts.binary) - { - /* Binary per-tuple header */ - CopySendInt16(cstate, list_length(cstate->attnumlist)); - } - /* Make sure the tuple is fully deconstructed */ slot_getallattrs(slot); - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - Datum value = slot->tts_values[attnum - 1]; - bool isnull = slot->tts_isnull[attnum - 1]; - - if (!cstate->opts.binary) - { - if (need_delim) - CopySendChar(cstate, cstate->opts.delim[0]); - need_delim = true; - } - - if (isnull) - { - if (!cstate->opts.binary) - CopySendString(cstate, cstate->opts.null_print_client); - else - CopySendInt32(cstate, -1); - } - else - { - if (!cstate->opts.binary) - { - string = OutputFunctionCall(&out_functions[attnum - 1], - value); - if (cstate->opts.csv_mode) - CopyAttributeOutCSV(cstate, string, - cstate->opts.force_quote_flags[attnum - 1], - list_length(cstate->attnumlist) == 1); - else - CopyAttributeOutText(cstate, string); - } - else - { - bytea *outputbytes; - - outputbytes = SendFunctionCall(&out_functions[attnum - 1], - value); - CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ); - CopySendData(cstate, VARDATA(outputbytes), - VARSIZE(outputbytes) - VARHDRSZ); - } - } - } - - CopySendEndOfRow(cstate); + cstate->opts.to_ops.one_row(cstate, slot); MemoryContextSwitchTo(oldcontext); } diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f2cca0b90b..6b5231b2f3 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -30,6 +30,28 @@ typedef enum CopyHeaderChoice COPY_HEADER_MATCH, } CopyHeaderChoice; +/* These are private in commands/copy[from|to].c */ +typedef struct CopyFromStateData *CopyFromState; +typedef struct CopyToStateData *CopyToState; + +/* Routines for a COPY TO format implementation. */ +typedef struct CopyToFormatOps +{ + /* Called when COPY TO is started. This will send a header. */ + void (*start) (CopyToState cstate, TupleDesc tupDesc); + + /* Copy one row for COPY TO. */ + void (*one_row) (CopyToState cstate, TupleTableSlot *slot); + + /* Called when COPY TO is ended. This will send a trailer. */ + void (*end) (CopyToState cstate); +} CopyToFormatOps; + +/* Predefined CopyToFormatOps for "text", "csv" and "binary". */ +extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsText; +extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsCSV; +extern PGDLLIMPORT const CopyToFormatOps CopyToFormatOpsBinary; + /* * A struct to hold COPY options, in a parsed form. All of these are related * to formatting, except for 'freeze', which doesn't really belong here, but @@ -63,12 +85,9 @@ typedef struct CopyFormatOptions bool *force_null_flags; /* per-column CSV FN flags */ bool convert_selectively; /* do selective binary conversion? */ List *convert_select; /* list of column names (can be NIL) */ + CopyToFormatOps to_ops; /* how to format to */ } CopyFormatOptions; -/* These are private in commands/copy[from|to].c */ -typedef struct CopyFromStateData *CopyFromState; -typedef struct CopyToStateData *CopyToState; - typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); typedef void (*copy_data_dest_cb) (void *data, int len); -- 2.40.1