On Wed, Feb 20, 2019 at 7:04 PM Andres Freund <and...@anarazel.de> wrote:
> > > On February 20, 2019 6:05:53 AM PST, Andrew Dunstan < > andrew.duns...@2ndquadrant.com> wrote: > > > >On 2/20/19 8:01 AM, Surafel Temesgen wrote: > >> > >> > >> On Tue, Feb 19, 2019 at 3:47 PM Andres Freund <and...@anarazel.de > >> <mailto:and...@anarazel.de>> wrote: > >> > >> > >> > >> Err, what? Again, that requires super user permissions (in > >> contrast to copy from/to stdin/out). Backends run as the user > >> postgres runs under > >> > >> > >> > >> okay i see it now and modified the patch similarly > >> > >> > > > > > >Why log to a file at all? We do have, you know, a database handy, where > >we might more usefully log errors. You could usefully log the offending > >row as an array of text, possibly. > > Or even just return it as a row. CopyBoth is relatively widely supported > these days. > > hello, i think generating warning about it also sufficiently meet its propose of notifying user about skipped record with existing logging facility and we use it for similar propose in other place too. The different i see is the number of warning that can be generated In addition to the above change in the attached patch i also change the syntax to ERROR LIMIT because it is no longer only skip unique and exclusion constrain violation regards Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 5e2992ddac..dc3b943279 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable FORCE_NOT_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] ) FORCE_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ENCODING '<replaceable class="parameter">encoding_name</replaceable>' + ERROR_LIMIT '<replaceable class="parameter">limit_number</replaceable>' </synopsis> </refsynopsisdiv> @@ -355,6 +356,21 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable </listitem> </varlistentry> + <varlistentry> + <term><literal>ERROR_LIMIT</literal></term> + <listitem> + <para> + Specifies to ignore error record up to <replaceable + class="parameter">limit_number</replaceable> number. + </para> + </listitem> + </varlistentry> + + <para> + Currently, only unique or exclusion constraint violation + and same record formatting error is ignored. + </para> + <varlistentry> <term><literal>WHERE</literal></term> <listitem> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index f1161f0fee..05a5f29d4c 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -48,6 +48,7 @@ #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -154,6 +155,7 @@ typedef struct CopyStateData List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ Node *whereClause; /* WHERE condition (or NULL) */ + int error_limit; /* total number of error to ignore */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -1291,6 +1293,21 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "error_limit") == 0) + { + if (cstate->error_limit > 0) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->error_limit = defGetInt64(defel); + if (cstate->error_limit <= 0) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("argument to option \"%s\" must be positive integer", + defel->defname), + parser_errposition(pstate, defel->location))); + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1441,6 +1458,10 @@ ProcessCopyOptions(ParseState *pstate, ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); + if (cstate->error_limit && !cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ERROR LIMIT only available using COPY FROM"))); } /* @@ -2837,7 +2858,10 @@ CopyFrom(CopyState cstate) /* Verify the named relation is a valid target for INSERT */ CheckValidResultRel(resultRelInfo, CMD_INSERT); - ExecOpenIndices(resultRelInfo, false); + if (cstate->error_limit) + ExecOpenIndices(resultRelInfo, true); + else + ExecOpenIndices(resultRelInfo, false); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; @@ -2942,6 +2966,13 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->error_limit) + { + /* + * Can't support speculative insertion in multi-inserts. + */ + insertMethod = CIM_SINGLE; + } else { /* @@ -3285,13 +3316,79 @@ CopyFrom(CopyState cstate) */ myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } + else if (cstate->error_limit && resultRelInfo->ri_NumIndices > 0) + { + /* Perform a speculative insertion. */ + uint32 specToken; + ItemPointerData conflictTid; + bool specConflict; + + /* + * Do a non-conclusive check for conflicts first. + */ + specConflict = false; + + if (!ExecCheckIndexConstraints(myslot, estate, &conflictTid, + NIL)) + { + ereport(WARNING, + (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("skipping \"%s\" --- violate exclusion or unique constraint", + cstate->line_buf.data))); + cstate->error_limit--; + continue; + } + + /* + * Acquire our speculative insertion lock". + */ + specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId()); + + /* insert the tuple, with the speculative token */ + table_tuple_insert_speculative(resultRelInfo->ri_RelationDesc, myslot, + estate->es_output_cid, + 0, + NULL, + specToken); + + /* insert index entries for tuple */ + recheckIndexes = ExecInsertIndexTuples(myslot, estate, true, + &specConflict, + NIL); + + /* adjust the tuple's state accordingly */ + table_tuple_complete_speculative(resultRelInfo->ri_RelationDesc, myslot, + specToken, !specConflict); + + /* + * Wake up anyone waiting for our decision. + */ + SpeculativeInsertionLockRelease(GetCurrentTransactionId()); + + /* + * If there was a conflict, warn about it and preceded + * to the next record if there are any. + */ + if (specConflict) + { + ereport(WARNING, + (errcode(ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION), + errmsg("skipping \"%s\" --- violate exclusion or unique constraint", + cstate->line_buf.data))); + cstate->error_limit--; + continue; + } + else + processed++; + + } else { /* OK, store the tuple and create index entries for it */ table_tuple_insert(resultRelInfo->ri_RelationDesc, myslot, mycid, ti_options, bistate); - if (resultRelInfo->ri_NumIndices > 0) + if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0) recheckIndexes = ExecInsertIndexTuples(myslot, estate, false, @@ -3312,7 +3409,8 @@ CopyFrom(CopyState cstate) * or FDW; this is the same definition used by nodeModifyTable.c * for counting tuples inserted by an INSERT command. */ - processed++; + if (!cstate->error_limit) + processed++; } } @@ -3703,7 +3801,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - +next_line: if (!cstate->binary) { char **field_strings; @@ -3718,9 +3816,21 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + { + if (cstate->error_limit) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- extra data after last expected column ", + cstate->line_buf.data))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -3732,10 +3842,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); + { + if (cstate->error_limit) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- missing data for column \"%s\" ", + cstate->line_buf.data, NameStr(att->attname)))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\" ", + NameStr(att->attname)))); + } string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -3822,10 +3944,23 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, } if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + { + if (cstate->error_limit) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- row field count is %d, expected %d ", + cstate->line_buf.data, (int) fld_count, attr_count))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + } i = 0; foreach(cur, cstate->attnumlist) diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index 8311b1dd46..35fde206a5 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -633,7 +633,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); DETACH DICTIONARY DISABLE_P DISCARD DISTINCT DO DOCUMENT_P DOMAIN_P DOUBLE_P DROP - EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ESCAPE EVENT EXCEPT + EACH ELSE ENABLE_P ENCODING ENCRYPTED END_P ENUM_P ERROR_P ESCAPE EVENT EXCEPT EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN EXTENSION EXTERNAL EXTRACT @@ -3054,6 +3054,10 @@ copy_opt_item: { $$ = makeDefElem("encoding", (Node *)makeString($2), @1); } + | ERROR_P LIMIT Iconst + { + $$ = makeDefElem("error_limit", (Node *)makeInteger($3), @1); + } ; /* The following exist for backward compatibility with very old versions */ @@ -15094,6 +15098,7 @@ unreserved_keyword: | ENCODING | ENCRYPTED | ENUM_P + | ERROR_P | ESCAPE | EVENT | EXCLUDE diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index 00ace8425e..1f4f154d19 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -146,6 +146,7 @@ PG_KEYWORD("encoding", ENCODING, UNRESERVED_KEYWORD) PG_KEYWORD("encrypted", ENCRYPTED, UNRESERVED_KEYWORD) PG_KEYWORD("end", END_P, RESERVED_KEYWORD) PG_KEYWORD("enum", ENUM_P, UNRESERVED_KEYWORD) +PG_KEYWORD("error", ERROR_P, UNRESERVED_KEYWORD) PG_KEYWORD("escape", ESCAPE, UNRESERVED_KEYWORD) PG_KEYWORD("event", EVENT, UNRESERVED_KEYWORD) PG_KEYWORD("except", EXCEPT, RESERVED_KEYWORD) diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index c53ed3ebf5..5421cbac4c 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -36,10 +36,10 @@ COPY x from stdin; ERROR: invalid input syntax for type integer: "" CONTEXT: COPY x, line 1, column a: "" COPY x from stdin; -ERROR: missing data for column "e" +ERROR: missing data for column "e" CONTEXT: COPY x, line 1: "2000 230 23 23" COPY x from stdin; -ERROR: missing data for column "e" +ERROR: missing data for column "e" CONTEXT: COPY x, line 1: "2001 231 \N \N" -- extra data: should fail COPY x from stdin; @@ -55,6 +55,10 @@ LINE 1: COPY x TO stdout WHERE a = 1; ^ COPY x from stdin WHERE a = 50004; COPY x from stdin WHERE a > 60003; +COPY x from stdin ERROR LIMIT 5; +WARNING: skipping "70001 22 32" --- missing data for column "d" +WARNING: skipping "70002 23 33 43 53 54" --- extra data after last expected column +WARNING: skipping "70003 24 34 44" --- missing data for column "e" COPY x from stdin WHERE f > 60003; ERROR: column "f" does not exist LINE 1: COPY x from stdin WHERE f > 60003; @@ -102,12 +106,14 @@ SELECT * FROM x; 50004 | 25 | 35 | 45 | before trigger fired 60004 | 25 | 35 | 45 | before trigger fired 60005 | 26 | 36 | 46 | before trigger fired + 70004 | 25 | 35 | 45 | before trigger fired + 70005 | 26 | 36 | 46 | before trigger fired 1 | 1 | stuff | test_1 | after trigger fired 2 | 2 | stuff | test_2 | after trigger fired 3 | 3 | stuff | test_3 | after trigger fired 4 | 4 | stuff | test_4 | after trigger fired 5 | 5 | stuff | test_5 | after trigger fired -(28 rows) +(30 rows) -- check copy out COPY x TO stdout; @@ -134,6 +140,8 @@ COPY x TO stdout; 50004 25 35 45 before trigger fired 60004 25 35 45 before trigger fired 60005 26 36 46 before trigger fired +70004 25 35 45 before trigger fired +70005 26 36 46 before trigger fired 1 1 stuff test_1 after trigger fired 2 2 stuff test_2 after trigger fired 3 3 stuff test_3 after trigger fired @@ -163,6 +171,8 @@ Delimiter before trigger fired 35 before trigger fired 35 before trigger fired 36 before trigger fired +35 before trigger fired +36 before trigger fired stuff after trigger fired stuff after trigger fired stuff after trigger fired @@ -192,6 +202,8 @@ I'm null before trigger fired 25 before trigger fired 25 before trigger fired 26 before trigger fired +25 before trigger fired +26 before trigger fired 1 after trigger fired 2 after trigger fired 3 after trigger fired diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index 902f4fac19..893bf215ed 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -110,6 +110,14 @@ COPY x from stdin WHERE a > 60003; 60005 26 36 46 56 \. +COPY x from stdin ERROR LIMIT 5; +70001 22 32 +70002 23 33 43 53 54 +70003 24 34 44 +70004 25 35 45 55 +70005 26 36 46 56 +\. + COPY x from stdin WHERE f > 60003; COPY x from stdin WHERE a = max(x.b);