On Wed, Dec 20, 2023 at 8:27 PM Masahiko Sawada <sawada.m...@gmail.com> wrote: > > > Why do we need to use SPI? I think we can form heap tuples and insert > them to the error table. Creating the error table also doesn't need to > use SPI. > Thanks for pointing it out. I figured out how to form heap tuples and insert them to the error table. but I don't know how to create the error table without using SPI. Please pointer it out.
> > > > copy_errors one per schema. > > foo.copy_errors will be owned by the schema: foo owner. > > It seems that the error table is created when the SAVE_ERROR is used > for the first time. It probably blocks concurrent COPY FROM commands > with SAVE_ERROR option to different tables if the error table is not > created yet. > I don't know how to solve this problem.... Maybe we can document this. but it will block the COPY FROM immediately. > > > > if you can insert to a table in that specific schema let's say foo, > > then you will get privilege to INSERT/DELETE/SELECT > > to foo.copy_errors. > > If you are not a superuser, you are only allowed to do > > INSERT/DELETE/SELECT on foo.copy_errors rows where USERID = > > current_user::regrole::oid. > > This is done via row level security. > > I don't think it works. If the user is dropped, the user's oid could > be reused for a different user. > You are right. so I changed, now the schema owner will be the error table owner. every error table tuple inserts, I switch to schema owner, do the insert, then switch back to the COPY_FROM operation user. now everyone (except superuser) will need explicit grant to access the error table.
From 8c8c266f1dc809ffa0ec9f4262bdd912ed6b758a Mon Sep 17 00:00:00 2001 From: pgaddict <jian.universal...@gmail.com> Date: Wed, 27 Dec 2023 20:15:24 +0800 Subject: [PATCH v13 1/1] Make COPY FROM more error tolerant MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently COPY FROM has 3 types of error while processing the source file. * extra data after last expected column * missing data for column \"%s\" * data type conversion error. Instead of throwing errors while copying, save_error specifier will save errors to table copy_errors for all the copy from operation in the same schema. We check the existing copy_errors table definition by column name and column data type. if table already exists and meets the criteria then errors metadata will save to copy_errors. if the table does not exist, then create one. table copy_errors is per schema-wise, it's owned by the copy from operation destination schema's owner. The table owner has full privilege on copy_errors, other non-superuser need gain privilege to access it. Only works for COPY FROM, non-BINARY mode. --- doc/src/sgml/ref/copy.sgml | 121 ++++++++++++- src/backend/commands/copy.c | 12 ++ src/backend/commands/copyfrom.c | 133 +++++++++++++- src/backend/commands/copyfromparse.c | 217 +++++++++++++++++++++-- src/backend/parser/gram.y | 8 +- src/bin/psql/tab-complete.c | 3 +- src/include/commands/copy.h | 1 + src/include/commands/copyfrom_internal.h | 6 + src/include/parser/kwlist.h | 1 + src/test/regress/expected/copy2.out | 137 ++++++++++++++ src/test/regress/sql/copy2.sql | 123 +++++++++++++ 11 files changed, 746 insertions(+), 16 deletions(-) diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 18ecc69c..1d0ff0b6 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * } FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * } ENCODING '<replaceable class="parameter">encoding_name</replaceable>' + SAVE_ERROR [ <replaceable class="parameter">boolean</replaceable> ] </synopsis> </refsynopsisdiv> @@ -411,6 +412,18 @@ WHERE <replaceable class="parameter">condition</replaceable> </listitem> </varlistentry> + <varlistentry> + <term><literal>SAVE_ERROR</literal></term> + <listitem> + <para> + Specifies that any data conversion errors while copying will automatically saved in table <literal>COPY_ERRORS</literal> and the <command>COPY FROM</command> operation will not be interrupted by conversion errors. + This option is not allowed when using <literal>binary</literal> format. Note that this + is only supported in current <command>COPY FROM</command> syntax. + If this option is omitted, any data type conversion errors will be raised immediately. + </para> + </listitem> + </varlistentry> + </variablelist> </refsect1> @@ -564,6 +577,7 @@ COPY <replaceable class="parameter">count</replaceable> amount to a considerable amount of wasted disk space if the failure happened well into a large copy operation. You might wish to invoke <command>VACUUM</command> to recover the wasted space. + To continue copying while skip conversion errors in a <command>COPY FROM</command>, you might wish to specify <literal>SAVE_ERROR</literal>. </para> <para> @@ -572,6 +586,18 @@ COPY <replaceable class="parameter">count</replaceable> null strings to null values and unquoted null strings to empty strings. </para> + <para> + If the <literal>SAVE_ERROR</literal> option is specified and conversion errors occur while copying, + <productname>PostgreSQL</productname> will first check the table <literal>COPY_ERRORS</literal> existence, then save the conversion error related information to it. + If it does exist, but the actual table definition cannot use it to save the error information, an error is raised, <command>COPY FROM</command> operation stops. + If it does not exist, <productname>PostgreSQL</productname> will try to create it before doing the actual copy operation. + The table <literal>COPY_ERRORS</literal> owner is the current schema owner. + All the future errors related information generated while copying data to the same schema will automatically be saved to the same <literal>COPY_ERRORS</literal> table. + Currenly only the owner can read and write data to table <literal>COPY_ERRORS</literal>. + Conversion errors include data type conversion failure, extra data or missing data in the source file. + <literal>COPY_ERRORS</literal> table detailed description listed in <xref linkend="copy-errors-table"/>. + + </para> </refsect1> <refsect1> @@ -588,7 +614,7 @@ COPY <replaceable class="parameter">count</replaceable> output function, or acceptable to the input function, of each attribute's data type. The specified null string is used in place of columns that are null. - <command>COPY FROM</command> will raise an error if any line of the + By default, if <literal>SAVE_ERROR</literal> not specified, <command>COPY FROM</command> will raise an error if any line of the input file contains more or fewer columns than are expected. </para> @@ -962,6 +988,99 @@ versions of <productname>PostgreSQL</productname>. check against somehow getting out of sync with the data. </para> </refsect3> + + <refsect3> + <title> TABLE COPY_ERRORS </title> + <para> + If <literal>SAVE_ERROR</literal> specified, all the data type conversion errors while copying will automatically saved in <literal>COPY_ERRORS</literal> + <xref linkend="copy-errors-table"/> shows <literal>COPY_ERRORS</literal> table's column name, data type, and description. + </para> + + <table id="copy-errors-table"> + <title>Error Saving table description </title> + + <tgroup cols="3"> + <thead> + <row> + <entry>Column name</entry> + <entry>Data type</entry> + <entry>Description</entry> + </row> + </thead> + + <tbody> + <row> + <entry> <literal>userid</literal> </entry> + <entry><type>oid</type></entry> + <entry>The user generated the conversion error. + Refer <link linkend="catalog-pg-authid"><structname>pg_authid</structname></link>.<structfield>oid</structfield>. + There is no hard depenedency with <literal>pg_authid</literal>, if correspond <structfield>oid</structfield> deleted in <literal>pg_authid</literal>, it becomes stale. + </entry> + </row> + + <row> + <entry> <literal>copy_destination</literal> </entry> + <entry><type>oid</type></entry> + <entry>The <command>COPY FROM</command> operation destination table oid. + Refer <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>. + There is no hard depenedency with <literal>pg_class</literal> if correspond <structfield>oid</structfield> deleted in <literal>pg_class</literal>, it becomes stale. + + </entry> + </row> + + <row> + <entry> <literal>filename</literal> </entry> + <entry><type>text</type></entry> + <entry>The path name of the input filed</entry> + </row> + + <row> + <entry> <literal>lineno</literal> </entry> + <entry><type>bigint</type></entry> + <entry>Line number where the error occurred, counting from 1</entry> + </row> + + <row> + <entry> <literal>line</literal> </entry> + <entry><type>text</type></entry> + <entry>Raw content of the error occurred line</entry> + </row> + + <row> + <entry> <literal>colname</literal> </entry> + <entry><type>text</type></entry> + <entry>Field where the error occurred</entry> + </row> + + <row> + <entry> <literal>raw_field_value</literal> </entry> + <entry><type>text</type></entry> + <entry>Raw content of the error occurred field</entry> + </row> + + <row> + <entry> <literal>err_message </literal> </entry> + <entry><type>text</type></entry> + <entry>The error message text </entry> + </row> + + <row> + <entry> <literal>err_detail</literal> </entry> + <entry><type>text</type></entry> + <entry>Detailed error message </entry> + </row> + + <row> + <entry> <literal>errorcode </literal> </entry> + <entry><type>text</type></entry> + <entry>The error code for the copying error</entry> + </row> + + </tbody> + </tgroup> + </table> + </refsect3> + </refsect2> </refsect1> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index cfad47b5..bc4af10a 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -419,6 +419,7 @@ ProcessCopyOptions(ParseState *pstate, bool format_specified = false; bool freeze_specified = false; bool header_specified = false; + bool save_error_specified = false; ListCell *option; /* Support external use for option sanity checking */ @@ -458,6 +459,13 @@ ProcessCopyOptions(ParseState *pstate, freeze_specified = true; opts_out->freeze = defGetBoolean(defel); } + else if (strcmp(defel->defname, "save_error") == 0) + { + if (save_error_specified) + errorConflictingDefElem(defel, pstate); + save_error_specified = true; + opts_out->save_error = defGetBoolean(defel); + } else if (strcmp(defel->defname, "delimiter") == 0) { if (opts_out->delim) @@ -598,6 +606,10 @@ ProcessCopyOptions(ParseState *pstate, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("cannot specify DEFAULT in BINARY mode"))); + if (opts_out->binary && opts_out->save_error) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("cannot specify SAVE_ERROR in BINARY mode"))); /* Set defaults for omitted options */ if (!opts_out->delim) opts_out->delim = opts_out->csv_mode ? "," : "\t"; diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index f4861652..a972ad87 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -29,7 +29,9 @@ #include "access/tableam.h" #include "access/xact.h" #include "access/xlog.h" +#include "catalog/pg_authid.h" #include "catalog/namespace.h" +#include "catalog/pg_namespace.h" #include "commands/copy.h" #include "commands/copyfrom_internal.h" #include "commands/progress.h" @@ -38,6 +40,7 @@ #include "executor/executor.h" #include "executor/nodeModifyTable.h" #include "executor/tuptable.h" +#include "executor/spi.h" #include "foreign/fdwapi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -52,6 +55,7 @@ #include "utils/portal.h" #include "utils/rel.h" #include "utils/snapmgr.h" +#include "utils/syscache.h" /* * No more than this many tuples per CopyMultiInsertBuffer @@ -655,7 +659,8 @@ CopyFrom(CopyFromState cstate) Assert(cstate->rel); Assert(list_length(cstate->range_table) == 1); - + if (cstate->opts.save_error) + Assert(cstate->escontext); /* * The target must be a plain, foreign, or partitioned relation, or have * an INSTEAD OF INSERT row trigger. (Currently, such triggers are only @@ -992,6 +997,10 @@ CopyFrom(CopyFromState cstate) if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) break; + /* Soft error occured, skip this tuple. */ + if (cstate->opts.save_error && cstate->line_error_occured) + continue; + ExecStoreVirtualTuple(myslot); /* @@ -1297,6 +1306,20 @@ CopyFrom(CopyFromState cstate) ExecResetTupleTable(estate->es_tupleTable, false); + if (cstate->opts.save_error) + { + Assert(cstate->copy_errors_nspname); + + if (cstate->error_rows_cnt > 0) + { + ereport(NOTICE, + errmsg("%llu rows were skipped because of conversion error." + " Skipped rows saved to table %s.copy_errors", + (unsigned long long) cstate->error_rows_cnt, + cstate->copy_errors_nspname)); + } + } + /* Allow the FDW to shut down */ if (target_resultRelInfo->ri_FdwRoutine != NULL && target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL) @@ -1444,6 +1467,114 @@ BeginCopyFrom(ParseState *pstate, } } + /* Set up soft error handler for SAVE_ERROR */ + if (cstate->opts.save_error) + { + StringInfoData querybuf; + bool isnull; + bool copy_erros_table_ok; + Oid nsp_oid; + Oid save_userid; + Oid ownerId; + int save_sec_context; + const char *copy_errors_nspname; + HeapTuple tuple; + + cstate->escontext = makeNode(ErrorSaveContext); + cstate->escontext->type = T_ErrorSaveContext; + cstate->escontext->details_wanted = true; + cstate->escontext->error_occurred = false; + + copy_errors_nspname = get_namespace_name(RelationGetNamespace(cstate->rel)); + nsp_oid = get_namespace_oid(copy_errors_nspname, false); + + initStringInfo(&querybuf); + /* + * + * Verify whether the nsp_oid.COPY_ERRORS table already exists, and if so, + * examine its column names and data types. + */ + appendStringInfo(&querybuf, + "SELECT (array_agg(pa.attname ORDER BY pa.attnum) " + "= '{ctid,userid,copy_destination,filename,lineno, " + "line,colname,raw_field_value,err_message,err_detail,errorcode}') " + "AND (ARRAY_AGG(pt.typname ORDER BY pa.attnum) " + "= '{tid,oid,oid,text,int8,text,text,text,text,text,text}') " + "FROM pg_catalog.pg_attribute pa " + "JOIN pg_catalog.pg_class pc ON pc.oid = pa.attrelid " + "JOIN pg_catalog.pg_type pt ON pt.oid = pa.atttypid " + "JOIN pg_catalog.pg_namespace pn " + "ON pn.oid = pc.relnamespace WHERE "); + appendStringInfo(&querybuf, + "relname = $$copy_errors$$ AND pn.nspname = $$%s$$ " + " AND pa.attnum >= -1 AND NOT attisdropped ", + copy_errors_nspname); + + if (SPI_connect() != SPI_OK_CONNECT) + elog(ERROR, "SPI_connect failed"); + + if (SPI_execute(querybuf.data, false, 0) != SPI_OK_SELECT) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + copy_erros_table_ok = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0], + SPI_tuptable->tupdesc, + 1, &isnull)); + + tuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(nsp_oid)); + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_SCHEMA), + errmsg("schema with OID %u does not exist", nsp_oid))); + ownerId = ((Form_pg_namespace) GETSTRUCT(tuple))->nspowner; + ReleaseSysCache(tuple); + + cstate->copy_errors_owner = ownerId; + + /* + * Switch to the schema owner's userid, so that the COPY_ERRORS table owned by + * that user. + */ + GetUserIdAndSecContext(&save_userid, &save_sec_context); + + SetUserIdAndSecContext(ownerId, + save_sec_context | SECURITY_LOCAL_USERID_CHANGE | + SECURITY_NOFORCE_RLS); + + /* No copy_errors_nspname.COPY_ERRORS table then create it for holding all the potential error. */ + if (isnull) + { + resetStringInfo(&querybuf); + appendStringInfo(&querybuf, + "CREATE TABLE %s.COPY_ERRORS( " + "USERID OID, COPY_DESTINATION OID, FILENAME TEXT,LINENO BIGINT " + ",LINE TEXT, COLNAME text, RAW_FIELD_VALUE TEXT " + ",ERR_MESSAGE TEXT, ERR_DETAIL TEXT, ERRORCODE TEXT)", copy_errors_nspname); + + if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY) + elog(ERROR, "SPI_exec failed: %s", querybuf.data); + } + else if(!copy_erros_table_ok) + ereport(ERROR, + (errmsg("table %s.COPY_ERRORS already exists. " + "cannot use it for COPY FROM error saving", + copy_errors_nspname))); + + if (SPI_finish() != SPI_OK_FINISH) + elog(ERROR, "SPI_finish failed"); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + cstate->copy_errors_nspname = pstrdup(copy_errors_nspname); + } + else + { + cstate->copy_errors_nspname = NULL; + cstate->escontext = NULL; + cstate->copy_errors_owner = (Oid) 0; + } + + cstate->error_rows_cnt = 0; /* set the default to 0 */ + cstate->line_error_occured = false; /* default, assume conversion be ok. */ + /* Convert convert_selectively name list to per-column flags */ if (cstate->opts.convert_selectively) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index f5537345..f0849725 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -58,18 +58,21 @@ */ #include "postgres.h" +#include "access/heapam.h" #include <ctype.h> #include <unistd.h> #include <sys/stat.h> - +#include <catalog/namespace.h> #include "commands/copy.h" #include "commands/copyfrom_internal.h" #include "commands/progress.h" #include "executor/executor.h" +#include "executor/spi.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" #include "mb/pg_wchar.h" #include "miscadmin.h" +#include "nodes/miscnodes.h" #include "pgstat.h" #include "port/pg_bswap.h" #include "utils/builtins.h" @@ -880,16 +883,85 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, int fldct; int fieldno; char *string; + char *errmsg_extra; + Oid save_userid = InvalidOid; + int save_sec_context = -1; + HeapTuple copy_errors_tup; + Relation copy_errorsrel; + TupleDesc copy_errors_tupDesc; + Datum t_values[10]; + bool t_isnull[10]; /* read raw fields in the next line */ if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) return false; + if (cstate->opts.save_error) + { + /* + * Open the copy_errors relation. we also need current userid for the later heap inserts. + * + */ + copy_errorsrel = table_open(RelnameGetRelid("copy_errors"), RowExclusiveLock); + copy_errors_tupDesc = copy_errorsrel->rd_att; + GetUserIdAndSecContext(&save_userid, &save_sec_context); + } + + /* reset line_error_occured to false for next new line. */ + if (cstate->line_error_occured) + cstate->line_error_occured = false; + /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + { + if(cstate->opts.save_error) + { + errmsg_extra = pstrdup("extra data after last expected column"); + t_values[0] = ObjectIdGetDatum(save_userid); + t_isnull[0] = false; + t_values[1] = ObjectIdGetDatum(cstate->rel->rd_rel->oid); + t_isnull[1] = false; + t_values[2] = CStringGetTextDatum( + cstate->filename ? cstate->filename : "STDIN"); + t_isnull[2] = false; + t_values[3] = Int64GetDatum((long long) cstate->cur_lineno); + t_isnull[3] = false; + t_values[4] = CStringGetTextDatum(cstate->line_buf.data); + t_isnull[4] = false; + t_values[5] = (Datum) 0; + t_isnull[5] = true; + t_values[6] = (Datum) 0; + t_isnull[6] = true; + t_values[7] = CStringGetTextDatum(errmsg_extra); + t_isnull[7] = false; + t_values[8] = (Datum) 0; + t_isnull[8] = true; + t_values[9] = CStringGetTextDatum( + unpack_sql_state(ERRCODE_BAD_COPY_FILE_FORMAT)); + t_isnull[9] = false; + + copy_errors_tup = heap_form_tuple(copy_errors_tupDesc, + t_values, + t_isnull); + + /* using copy_errors owner do the simple_heap_insert */ + SetUserIdAndSecContext(cstate->copy_errors_owner, + save_sec_context | SECURITY_LOCAL_USERID_CHANGE | + SECURITY_NOFORCE_RLS); + simple_heap_insert(copy_errorsrel, copy_errors_tup); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + cstate->line_error_occured = true; + cstate->error_rows_cnt++; + table_close(copy_errorsrel, RowExclusiveLock); + return true; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -901,10 +973,55 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); + { + if(cstate->opts.save_error) + { + t_values[0] = ObjectIdGetDatum(save_userid); + t_isnull[0] = false; + t_values[1] = ObjectIdGetDatum(cstate->rel->rd_rel->oid); + t_isnull[1] = false; + t_values[2] = CStringGetTextDatum(cstate->filename ? cstate->filename : "STDIN"); + t_isnull[2] = false; + t_values[3] = Int64GetDatum((long long) cstate->cur_lineno); + t_isnull[3] = false; + t_values[4] = CStringGetTextDatum(cstate->line_buf.data); + t_isnull[4] = false; + t_values[5] = (Datum) 0; + t_isnull[5] = true; + t_values[6] = (Datum) 0; + t_isnull[6] = true; + t_values[7] = CStringGetTextDatum( + psprintf("missing data for column \"%s\"", NameStr(att->attname))); + t_isnull[7] = false; + t_values[8] = (Datum) 0; + t_isnull[8] = true; + t_values[9] = CStringGetTextDatum( + unpack_sql_state(ERRCODE_BAD_COPY_FILE_FORMAT)); + t_isnull[9] = false; + + copy_errors_tup = heap_form_tuple(copy_errors_tupDesc, + t_values, + t_isnull); + /* using copy_errors owner do the simple_heap_insert */ + SetUserIdAndSecContext(cstate->copy_errors_owner, + save_sec_context | SECURITY_LOCAL_USERID_CHANGE | + SECURITY_NOFORCE_RLS); + simple_heap_insert(copy_errorsrel, copy_errors_tup); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + cstate->line_error_occured = true; + cstate->error_rows_cnt++; + table_close(copy_errorsrel, RowExclusiveLock); + return true; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + } + string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -956,15 +1073,91 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); } else - values[m] = InputFunctionCall(&in_functions[m], - string, - typioparams[m], - att->atttypmod); + { + /* + * + * InputFunctionCall is more faster than InputFunctionCallSafe. + * + */ + if(!cstate->opts.save_error) + values[m] = InputFunctionCall(&in_functions[m], + string, + typioparams[m], + att->atttypmod); + else + { + if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + char *err_detail; + char *err_code; + err_code = pstrdup(unpack_sql_state(cstate->escontext->error_data->sqlerrcode)); + if (!cstate->escontext->error_data->detail) + err_detail = NULL; + else + err_detail = cstate->escontext->error_data->detail; + + t_values[0] = ObjectIdGetDatum(save_userid); + t_isnull[0] = false; + t_values[1] = ObjectIdGetDatum(cstate->rel->rd_rel->oid); + t_isnull[1] = false; + t_values[2] = CStringGetTextDatum(cstate->filename ? cstate->filename : "STDIN"); + t_isnull[2] = false; + t_values[3] = Int64GetDatum((long long) cstate->cur_lineno); + t_isnull[3] = false; + t_values[4] = CStringGetTextDatum(cstate->line_buf.data); + t_isnull[4] = false; + t_values[5] = CStringGetTextDatum(cstate->cur_attname); + t_isnull[5] = false; + t_values[6] = CStringGetTextDatum(string); + t_isnull[6] = false; + t_values[7] = CStringGetTextDatum(cstate->escontext->error_data->message); + t_isnull[7] = false; + t_values[8] = err_detail ? CStringGetTextDatum(err_detail) : (Datum) 0; + t_isnull[8] = err_detail ? false: true; + t_values[9] = CStringGetTextDatum(err_code); + t_isnull[9] = false; + + copy_errors_tup = heap_form_tuple(copy_errors_tupDesc, + t_values, + t_isnull); + /* using copy_errors owner do the simple_heap_insert */ + SetUserIdAndSecContext(cstate->copy_errors_owner, + save_sec_context | SECURITY_LOCAL_USERID_CHANGE | + SECURITY_NOFORCE_RLS); + + simple_heap_insert(copy_errorsrel, copy_errors_tup); + + /* Restore userid and security context */ + SetUserIdAndSecContext(save_userid, save_sec_context); + + /* line error occured, set it once per line */ + if (!cstate->line_error_occured) + cstate->line_error_occured = true; + /* reset ErrorSaveContext */ + cstate->escontext->error_occurred = false; + cstate->escontext->details_wanted = true; + memset(cstate->escontext->error_data,0, sizeof(ErrorData)); + } + } + } cstate->cur_attname = NULL; cstate->cur_attval = NULL; } + /* record error rows count. */ + if (cstate->line_error_occured) + { + cstate->error_rows_cnt++; + Assert(cstate->opts.save_error); + } + if (cstate->opts.save_error) + table_close(copy_errorsrel, RowExclusiveLock); Assert(fieldno == attr_count); } else diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 63f172e1..f42e72aa 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -755,7 +755,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); RESET RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP ROUTINE ROUTINES ROW ROWS RULE - SAVEPOINT SCALAR SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT + SAVEPOINT SAVE_ERROR SCALAR SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT SEQUENCE SEQUENCES SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P @@ -3448,6 +3448,10 @@ copy_opt_item: { $$ = makeDefElem("encoding", (Node *) makeString($2), @1); } + | SAVE_ERROR + { + $$ = makeDefElem("save_error", (Node *) makeBoolean(true), @1); + } ; /* The following exist for backward compatibility with very old versions */ @@ -17346,6 +17350,7 @@ unreserved_keyword: | ROWS | RULE | SAVEPOINT + | SAVE_ERROR | SCALAR | SCHEMA | SCHEMAS @@ -17954,6 +17959,7 @@ bare_label_keyword: | ROWS | RULE | SAVEPOINT + | SAVE_ERROR | SCALAR | SCHEMA | SCHEMAS diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index 04980118..e6a358e0 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -2890,7 +2890,8 @@ psql_completion(const char *text, int start, int end) else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(")) COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL", "HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE", - "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING", "DEFAULT"); + "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING", "DEFAULT", + "SAVE_ERROR"); /* Complete COPY <sth> FROM|TO filename WITH (FORMAT */ else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(", "FORMAT")) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index f2cca0b9..aa560dbb 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -43,6 +43,7 @@ typedef struct CopyFormatOptions bool binary; /* binary format? */ bool freeze; /* freeze rows on loading? */ bool csv_mode; /* Comma Separated Value format? */ + bool save_error; /* save error to a table? */ CopyHeaderChoice header_line; /* header line? */ char *null_print; /* NULL marker string (server encoding!) */ int null_print_len; /* length of same */ diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 5ec41589..2c3b7b42 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -16,6 +16,7 @@ #include "commands/copy.h" #include "commands/trigger.h" +#include "nodes/miscnodes.h" /* * Represents the different source cases we need to worry about at @@ -94,6 +95,11 @@ typedef struct CopyFromStateData * default value */ FmgrInfo *in_functions; /* array of input functions for each attrs */ Oid *typioparams; /* array of element types for in_functions */ + Oid copy_errors_owner; /* the owner of copy_errors table */ + ErrorSaveContext *escontext; /* soft error trapper during in_functions execution */ + uint64 error_rows_cnt; /* total number of rows that have errors */ + const char *copy_errors_nspname; /* the copy_errors's namespace */ + bool line_error_occured; /* does this line conversion error happened */ int *defmap; /* array of default att numbers related to * missing att */ ExprState **defexprs; /* array of default att expressions for all diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 5984dcfa..d0988a4c 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -377,6 +377,7 @@ PG_KEYWORD("routines", ROUTINES, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("row", ROW, COL_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("rows", ROWS, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("rule", RULE, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("save_error", SAVE_ERROR, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("savepoint", SAVEPOINT, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("scalar", SCALAR, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("schema", SCHEMA, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index c4178b9c..a2c6bf5aa 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -564,6 +564,118 @@ ERROR: conflicting or redundant options LINE 1: ... b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *, FORCE_NULL... ^ ROLLBACK; +-- +-- tests for SAVE_ERROR option with force_not_null, force_null +\pset null NULL +CREATE TABLE save_error_csv( + a INT NOT NULL, + b TEXT NOT NULL, + c TEXT +); +--save_error not allowed in binary mode +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT binary); +ERROR: cannot specify SAVE_ERROR in BINARY mode +-- redundant options not allowed. +COPY save_error_csv FROM STDIN WITH (save_error, save_error off); +ERROR: conflicting or redundant options +LINE 1: COPY save_error_csv FROM STDIN WITH (save_error, save_error ... + ^ +create table COPY_ERRORS(); +--should fail. since table COPY_ERRORS already exists. +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error); +ERROR: table public.COPY_ERRORS already exists. cannot use it for COPY FROM error saving +drop table COPY_ERRORS; +--with FORCE_NOT_NULL and FORCE_NULL. +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c)); +NOTICE: 2 rows were skipped because of conversion error. Skipped rows saved to table public.copy_errors +SELECT *, b is null as b_null, b = '' as b_empty FROM save_error_csv; + a | b | c | b_null | b_empty +---+---+------+--------+--------- + 2 | | NULL | f | t +(1 row) + +DROP TABLE save_error_csv; +-- save error with extra data and missing data some column. +---normal data type conversion error case. +CREATE TABLE check_ign_err (n int, m int[], k bigint, l text); +COPY check_ign_err FROM STDIN WITH (save_error); +NOTICE: 10 rows were skipped because of conversion error. Skipped rows saved to table public.copy_errors +select pc.relname, ce.filename,ce.lineno,ce.line,ce.colname, + ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode +from copy_errors ce join pg_class pc on pc.oid = ce.copy_destination +where pc.relname = 'check_ign_err'; + relname | filename | lineno | line | colname | raw_field_value | err_message | err_detail | errorcode +---------------+----------+--------+--------------------------------------------+---------+-------------------------+-----------------------------------------------------------------+---------------------------+----------- + check_ign_err | STDIN | 1 | 1 {1} 1 1 extra | NULL | NULL | extra data after last expected column | NULL | 22P04 + check_ign_err | STDIN | 2 | 2 | NULL | NULL | missing data for column "m" | NULL | 22P04 + check_ign_err | STDIN | 3 | \n {1} 1 \- | n | +| invalid input syntax for type integer: " +| NULL | 22P02 + | | | | | | " | | + check_ign_err | STDIN | 4 | a {2} 2 \r | n | a | invalid input syntax for type integer: "a" | NULL | 22P02 + check_ign_err | STDIN | 5 | 3 {\3} 3333333333 \n | m | {\x03} | invalid input syntax for type integer: "\x03" | NULL | 22P02 + check_ign_err | STDIN | 6 | 0x11 {3,} 3333333333 \\. | m | {3,} | malformed array literal: "{3,}" | Unexpected "}" character. | 22P02 + check_ign_err | STDIN | 7 | d {3,1/} 3333333333 \\0 | n | d | invalid input syntax for type integer: "d" | NULL | 22P02 + check_ign_err | STDIN | 7 | d {3,1/} 3333333333 \\0 | m | {3,1/} | invalid input syntax for type integer: "1/" | NULL | 22P02 + check_ign_err | STDIN | 8 | e {3,\1} -3323879289873933333333 \n | n | e | invalid input syntax for type integer: "e" | NULL | 22P02 + check_ign_err | STDIN | 8 | e {3,\1} -3323879289873933333333 \n | m | {3,\x01} | invalid input syntax for type integer: "\x01" | NULL | 22P02 + check_ign_err | STDIN | 8 | e {3,\1} -3323879289873933333333 \n | k | -3323879289873933333333 | value "-3323879289873933333333" is out of range for type bigint | NULL | 22003 + check_ign_err | STDIN | 9 | f {3,1} 3323879289873933333333 \r | n | f | invalid input syntax for type integer: "f" | NULL | 22P02 + check_ign_err | STDIN | 9 | f {3,1} 3323879289873933333333 \r | k | 3323879289873933333333 | value "3323879289873933333333" is out of range for type bigint | NULL | 22003 + check_ign_err | STDIN | 10 | b {a, 4} 1.1 h | n | b | invalid input syntax for type integer: "b" | NULL | 22P02 + check_ign_err | STDIN | 10 | b {a, 4} 1.1 h | m | {a, 4} | invalid input syntax for type integer: "a" | NULL | 22P02 + check_ign_err | STDIN | 10 | b {a, 4} 1.1 h | k | 1.1 | invalid input syntax for type bigint: "1.1" | NULL | 22P02 +(16 rows) + +DROP TABLE check_ign_err; +truncate COPY_ERRORS; +--(type textrange was already made in test_setup.sql) +--using textrange doing test +begin; +CREATE USER regress_user12; +CREATE USER regress_user13; +CREATE SCHEMA IF NOT EXISTS copy_errors_test AUTHORIZATION regress_user12; +SET LOCAL search_path TO copy_errors_test; +GRANT USAGE on schema copy_errors_test to regress_user12,regress_user13; +GRANT CREATE on schema copy_errors_test to regress_user12; +set role regress_user12; +CREATE TABLE textrange_input(a public.textrange, b public.textrange, c public.textrange); +GRANT insert on textrange_input to regress_user13; +set role regress_user13; +COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *); +NOTICE: 2 rows were skipped because of conversion error. Skipped rows saved to table copy_errors_test.copy_errors +SAVEPOINT s1; +--should fail. no priviledge +select * from copy_errors_test.copy_errors; +ERROR: permission denied for table copy_errors +ROLLBACK to s1; +set role regress_user12; +COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *); +NOTICE: 2 rows were skipped because of conversion error. Skipped rows saved to table copy_errors_test.copy_errors +SELECT pc.relname,pr.rolname,ce.filename,ce.lineno,ce.line,ce.colname, + ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode +FROM copy_errors_test.copy_errors ce +JOIN pg_class pc ON pc.oid = ce.copy_destination +JOIN pg_roles pr ON pr.oid = ce.userid; + relname | rolname | filename | lineno | line | colname | raw_field_value | err_message | err_detail | errorcode +-----------------+----------------+----------+--------+----------------------------+---------+-----------------+-------------------------------------------------------------------+------------------------------------------+----------- + textrange_input | regress_user13 | STDIN | 1 | ,-[a\","z),[a","-inf) | b | -[a\,z) | malformed range literal: "-[a\,z)" | Missing left parenthesis or bracket. | 22P02 + textrange_input | regress_user13 | STDIN | 1 | ,-[a\","z),[a","-inf) | c | [a,-inf) | range lower bound must be less than or equal to range upper bound | NULL | 22000 + textrange_input | regress_user13 | STDIN | 2 | (",a),(",",a),()",a); | a | (,a),( | malformed range literal: "(,a),(" | Junk after right parenthesis or bracket. | 22P02 + textrange_input | regress_user13 | STDIN | 2 | (",a),(",",a),()",a); | b | ,a),() | malformed range literal: ",a),()" | Missing left parenthesis or bracket. | 22P02 + textrange_input | regress_user13 | STDIN | 2 | (",a),(",",a),()",a); | c | a); | malformed range literal: "a);" | Missing left parenthesis or bracket. | 22P02 + textrange_input | regress_user12 | STDIN | 1 | (a",")),(]","a),(a","]) | a | (a,)) | malformed range literal: "(a,))" | Junk after right parenthesis or bracket. | 22P02 + textrange_input | regress_user12 | STDIN | 1 | (a",")),(]","a),(a","]) | b | (],a) | malformed range literal: "(],a)" | Missing comma after lower bound. | 22P02 + textrange_input | regress_user12 | STDIN | 1 | (a",")),(]","a),(a","]) | c | (a,]) | malformed range literal: "(a,])" | Junk after right parenthesis or bracket. | 22P02 + textrange_input | regress_user12 | STDIN | 2 | [z","a],[z","2],[(","",")] | a | [z,a] | range lower bound must be less than or equal to range upper bound | NULL | 22000 + textrange_input | regress_user12 | STDIN | 2 | [z","a],[z","2],[(","",")] | b | [z,2] | range lower bound must be less than or equal to range upper bound | NULL | 22000 + textrange_input | regress_user12 | STDIN | 2 | [z","a],[z","2],[(","",")] | c | [(,",)] | malformed range literal: "[(,",)]" | Unexpected end of input. | 22P02 +(11 rows) + +--owner allowed to drop the table. +drop table copy_errors; +--should fail. no priviledge +select * from public.copy_errors; +ERROR: permission denied for table copy_errors +ROLLBACK; \pset null '' -- test case with whole-row Var in a check constraint create table check_con_tbl (f1 int); @@ -822,3 +934,28 @@ truncate copy_default; -- DEFAULT cannot be used in COPY TO copy (select 1 as test) TO stdout with (default '\D'); ERROR: COPY DEFAULT only available using COPY FROM +-- DEFAULT WITH SAVE_ERROR. +create table copy_default_error_save ( + id integer, + text_value text not null default 'test', + ts_value timestamp without time zone not null default '2022-07-05' +); +copy copy_default_error_save from stdin with (save_error, default '\D'); +NOTICE: 3 rows were skipped because of conversion error. Skipped rows saved to table public.copy_errors +select ce.filename,ce.lineno,ce.line, + ce.colname, ce.raw_field_value, + ce.err_message, ce.err_detail,ce.errorcode +from public.copy_errors ce +join pg_class pc on pc.oid = ce.copy_destination +where pc.relname = 'copy_default_error_save' +order by lineno, colname; + filename | lineno | line | colname | raw_field_value | err_message | err_detail | errorcode +----------+--------+----------------------------------+----------+------------------+-------------------------------------------------------------+------------+----------- + STDIN | 1 | k value '2022-07-04' | id | k | invalid input syntax for type integer: "k" | | 22P02 + STDIN | 2 | z \D '2022-07-03ASKL' | id | z | invalid input syntax for type integer: "z" | | 22P02 + STDIN | 2 | z \D '2022-07-03ASKL' | ts_value | '2022-07-03ASKL' | invalid input syntax for type timestamp: "'2022-07-03ASKL'" | | 22007 + STDIN | 3 | s \D \D | id | s | invalid input syntax for type integer: "s" | | 22P02 +(4 rows) + +drop table copy_default_error_save, copy_errors; +truncate copy_default; diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index a5486f60..a37986df 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -374,6 +374,106 @@ BEGIN; COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *, FORCE_NULL(b)); ROLLBACK; +-- +-- tests for SAVE_ERROR option with force_not_null, force_null +\pset null NULL +CREATE TABLE save_error_csv( + a INT NOT NULL, + b TEXT NOT NULL, + c TEXT +); + +--save_error not allowed in binary mode +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT binary); + +-- redundant options not allowed. +COPY save_error_csv FROM STDIN WITH (save_error, save_error off); + +create table COPY_ERRORS(); +--should fail. since table COPY_ERRORS already exists. +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error); + +drop table COPY_ERRORS; + +--with FORCE_NOT_NULL and FORCE_NULL. +COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c)); +z,,"" +\0,, +2,, +\. + +SELECT *, b is null as b_null, b = '' as b_empty FROM save_error_csv; +DROP TABLE save_error_csv; + +-- save error with extra data and missing data some column. +---normal data type conversion error case. +CREATE TABLE check_ign_err (n int, m int[], k bigint, l text); +COPY check_ign_err FROM STDIN WITH (save_error); +1 {1} 1 1 extra +2 +\n {1} 1 \- +a {2} 2 \r +3 {\3} 3333333333 \n +0x11 {3,} 3333333333 \\. +d {3,1/} 3333333333 \\0 +e {3,\1} -3323879289873933333333 \n +f {3,1} 3323879289873933333333 \r +b {a, 4} 1.1 h +5 {5} 5 \\ +\. + +select pc.relname, ce.filename,ce.lineno,ce.line,ce.colname, + ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode +from copy_errors ce join pg_class pc on pc.oid = ce.copy_destination +where pc.relname = 'check_ign_err'; + +DROP TABLE check_ign_err; +truncate COPY_ERRORS; + +--(type textrange was already made in test_setup.sql) +--using textrange doing test +begin; +CREATE USER regress_user12; +CREATE USER regress_user13; +CREATE SCHEMA IF NOT EXISTS copy_errors_test AUTHORIZATION regress_user12; +SET LOCAL search_path TO copy_errors_test; + +GRANT USAGE on schema copy_errors_test to regress_user12,regress_user13; +GRANT CREATE on schema copy_errors_test to regress_user12; +set role regress_user12; +CREATE TABLE textrange_input(a public.textrange, b public.textrange, c public.textrange); +GRANT insert on textrange_input to regress_user13; + +set role regress_user13; +COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *); +,-[a\","z),[a","-inf) +(",a),(",",a),()",a); +\. + +SAVEPOINT s1; +--should fail. no priviledge +select * from copy_errors_test.copy_errors; + +ROLLBACK to s1; + +set role regress_user12; +COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *); +(a",")),(]","a),(a","]) +[z","a],[z","2],[(","",")] +\. + +SELECT pc.relname,pr.rolname,ce.filename,ce.lineno,ce.line,ce.colname, + ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode +FROM copy_errors_test.copy_errors ce +JOIN pg_class pc ON pc.oid = ce.copy_destination +JOIN pg_roles pr ON pr.oid = ce.userid; + +--owner allowed to drop the table. +drop table copy_errors; + +--should fail. no priviledge +select * from public.copy_errors; +ROLLBACK; \pset null '' -- test case with whole-row Var in a check constraint @@ -609,3 +709,26 @@ truncate copy_default; -- DEFAULT cannot be used in COPY TO copy (select 1 as test) TO stdout with (default '\D'); + +-- DEFAULT WITH SAVE_ERROR. +create table copy_default_error_save ( + id integer, + text_value text not null default 'test', + ts_value timestamp without time zone not null default '2022-07-05' +); +copy copy_default_error_save from stdin with (save_error, default '\D'); +k value '2022-07-04' +z \D '2022-07-03ASKL' +s \D \D +\. + +select ce.filename,ce.lineno,ce.line, + ce.colname, ce.raw_field_value, + ce.err_message, ce.err_detail,ce.errorcode +from public.copy_errors ce +join pg_class pc on pc.oid = ce.copy_destination +where pc.relname = 'copy_default_error_save' +order by lineno, colname; + +drop table copy_default_error_save, copy_errors; +truncate copy_default; \ No newline at end of file -- 2.34.1