Hi, > > ERROR_LIMIT '<replaceable > class="parameter">limit_number</replaceable>' > > > > I think this should be: > > > > ERROR_LIMIT <replaceable class="parameter">limit_number</replaceable> > > > > (no single quote) > > Thank you .Fixed
> More comments: > > - I think the document should stat that if limit_number = 0, all > errors are immediately raised (behaves same as current befavior without > the patch). > > if we want all error to be raised error limit_number not need to be specified. but if it is specified like limit_number = 0 i think it is self-explanatory > - "constraint violating rows will be returned back to the caller." > This does explains the current implementation. I am not sure if it's > intended or not though: > > cat /tmp/a > 1 1 > 2 2 > 3 3 > 3 4 > > psql test > $ psql test > psql (13devel) > Type "help" for help. > > test=# select * from t1; > i | j > ---+--- > 1 | 1 > 2 | 2 > 3 | 3 > (3 rows) > > test=# copy t1 from '/tmp/a' with (error_limit 1); > ERROR: duplicate key value violates unique constraint "t1_pkey" > DETAIL: Key (i)=(2) already exists. > CONTEXT: COPY t1, line 2: "2 2" > > So if the number of errors raised exceeds error_limit, no constaraint > violating rows (in this case i=1, j=1) are returned. > error_limit is specified to dictate the number of error allowed in copy operation to precede. If it exceed the number the operation is stopped. there may be more conflict afterward and returning limited number of conflicting rows have no much use regards Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index a99f8155e4..c53e5f6d92 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable FORCE_NOT_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] ) FORCE_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] ) ENCODING '<replaceable class="parameter">encoding_name</replaceable>' + ERROR_LIMIT <replaceable class="parameter">limit_number</replaceable> </synopsis> </refsynopsisdiv> @@ -355,6 +356,26 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable </listitem> </varlistentry> + <varlistentry> + <term><literal>ERROR_LIMIT</literal></term> + <listitem> + <para> + Enables ignoring of errored out rows up to <replaceable + class="parameter">limit_number</replaceable>. If <replaceable + class="parameter">limit_number</replaceable> is set + to -1, then all errors will be ignored. + </para> + + <para> + Currently, only unique or exclusion constraint violation + and rows formatting errors are ignored. Malformed + rows will rise warnings, while constraint violating rows + will be returned back to the caller. + </para> + + </listitem> + </varlistentry> + <varlistentry> <term><literal>WHERE</literal></term> <listitem> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 40a8ec1abd..72225a85a0 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -24,6 +24,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "access/xlog.h" +#include "access/printtup.h" #include "catalog/dependency.h" #include "catalog/pg_authid.h" #include "catalog/pg_type.h" @@ -48,7 +49,9 @@ #include "port/pg_bswap.h" #include "rewrite/rewriteHandler.h" #include "storage/fd.h" +#include "storage/lmgr.h" #include "tcop/tcopprot.h" +#include "tcop/pquery.h" #include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -153,6 +156,7 @@ typedef struct CopyStateData List *convert_select; /* list of column names (can be NIL) */ bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ Node *whereClause; /* WHERE condition (or NULL) */ + int error_limit; /* total number of error to ignore */ /* these are just for error messages, see CopyFromErrorCallback */ const char *cur_relname; /* table name for error messages */ @@ -182,6 +186,9 @@ typedef struct CopyStateData bool volatile_defexprs; /* is any of defexprs volatile? */ List *range_table; ExprState *qualexpr; + bool ignore_error; /* is ignore error specified? */ + bool ignore_all_error; /* is error_limit -1 (ignore all error) + * specified? */ TransitionCaptureState *transition_capture; @@ -836,7 +843,7 @@ CopyLoadRawBuf(CopyState cstate) void DoCopy(ParseState *pstate, const CopyStmt *stmt, int stmt_location, int stmt_len, - uint64 *processed) + uint64 *processed, DestReceiver *dest) { CopyState cstate; bool is_from = stmt->is_from; @@ -1068,7 +1075,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt, cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program, NULL, stmt->attlist, stmt->options); cstate->whereClause = whereClause; - *processed = CopyFrom(cstate); /* copy from file to database */ + *processed = CopyFrom(cstate, dest); /* copy from file to database */ EndCopyFrom(cstate); } else @@ -1290,6 +1297,18 @@ ProcessCopyOptions(ParseState *pstate, defel->defname), parser_errposition(pstate, defel->location))); } + else if (strcmp(defel->defname, "error_limit") == 0) + { + if (cstate->ignore_error) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("conflicting or redundant options"), + parser_errposition(pstate, defel->location))); + cstate->error_limit = defGetInt64(defel); + cstate->ignore_error = true; + if (cstate->error_limit == -1) + cstate->ignore_all_error = true; + } else ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), @@ -1440,6 +1459,10 @@ ProcessCopyOptions(ParseState *pstate, ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("CSV quote character must not appear in the NULL specification"))); + if (cstate->ignore_error && !cstate->is_copy_from) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("ERROR LIMIT only available using COPY FROM"))); } /* @@ -2653,7 +2676,7 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, * Copy FROM file to relation. */ uint64 -CopyFrom(CopyState cstate) +CopyFrom(CopyState cstate, DestReceiver *dest) { ResultRelInfo *resultRelInfo; ResultRelInfo *target_resultRelInfo; @@ -2675,6 +2698,7 @@ CopyFrom(CopyState cstate) bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; + Portal portal = NULL; Assert(cstate->rel); @@ -2838,7 +2862,19 @@ CopyFrom(CopyState cstate) /* Verify the named relation is a valid target for INSERT */ CheckValidResultRel(resultRelInfo, CMD_INSERT); - ExecOpenIndices(resultRelInfo, false); + if (cstate->ignore_error) + { + TupleDesc tupDesc; + + ExecOpenIndices(resultRelInfo, true); + tupDesc = RelationGetDescr(cstate->rel); + + portal = GetPortalByName(""); + SetRemoteDestReceiverParams(dest, portal); + dest->rStartup(dest, (int) CMD_SELECT, tupDesc); + } + else + ExecOpenIndices(resultRelInfo, false); estate->es_result_relations = resultRelInfo; estate->es_num_result_relations = 1; @@ -2943,6 +2979,13 @@ CopyFrom(CopyState cstate) */ insertMethod = CIM_SINGLE; } + else if (cstate->ignore_error) + { + /* + * Can't support speculative insertion in multi-inserts. + */ + insertMethod = CIM_SINGLE; + } else { /* @@ -3286,6 +3329,63 @@ CopyFrom(CopyState cstate) */ myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc); } + else if ((cstate->error_limit > 0 || cstate->ignore_all_error) && resultRelInfo->ri_NumIndices > 0) + { + /* Perform a speculative insertion. */ + uint32 specToken; + ItemPointerData conflictTid; + bool specConflict; + + /* + * Do a non-conclusive check for conflicts first. + */ + specConflict = false; + + if (!ExecCheckIndexConstraints(myslot, estate, &conflictTid, + NIL)) + { + (void) dest->receiveSlot(myslot, dest); + cstate->error_limit--; + continue; + } + + /* + * Acquire our speculative insertion lock. + */ + specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId()); + + /* insert the tuple, with the speculative token */ + table_tuple_insert_speculative(resultRelInfo->ri_RelationDesc, myslot, + estate->es_output_cid, + 0, + NULL, + specToken); + + /* insert index entries for tuple */ + recheckIndexes = ExecInsertIndexTuples(myslot, estate, true, + &specConflict, + NIL); + + /* adjust the tuple's state accordingly */ + table_tuple_complete_speculative(resultRelInfo->ri_RelationDesc, myslot, + specToken, !specConflict); + + /* + * Wake up anyone waiting for our decision. + */ + SpeculativeInsertionLockRelease(GetCurrentTransactionId()); + + /* + * If there was a conflict, return it and preceded to + * the next record if there are any. + */ + if (specConflict) + { + (void) dest->receiveSlot(myslot, dest); + cstate->error_limit--; + continue; + } + } else { /* OK, store the tuple and create index entries for it */ @@ -3703,7 +3803,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); - +next_line: if (!cstate->binary) { char **field_strings; @@ -3718,9 +3818,21 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, /* check for overflowing fields */ if (attr_count > 0 && fldct > attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- extra data after last expected column", + cstate->line_buf.data))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + } fieldno = 0; @@ -3732,10 +3844,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, Form_pg_attribute att = TupleDescAttr(tupDesc, m); if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- missing data for column \"%s\"", + cstate->line_buf.data, NameStr(att->attname)))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + } string = field_strings[fieldno++]; if (cstate->convert_select_flags && @@ -3822,10 +3946,23 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext, } if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + { + if (cstate->error_limit > 0 || cstate->ignore_all_error) + { + ereport(WARNING, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("skipping \"%s\" --- row field count is %d, expected %d", + cstate->line_buf.data, (int) fld_count, attr_count))); + cstate->error_limit--; + goto next_line; + } + else + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + } i = 0; foreach(cur, cstate->attnumlist) diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index f8183cd488..817d0af002 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -784,7 +784,7 @@ copy_table(Relation rel) cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL); /* Do the copy */ - (void) CopyFrom(cstate); + (void) CopyFrom(cstate, NULL); logicalrep_rel_close(relmapentry, NoLock); } diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c index bb85b5e52a..746a2a5160 100644 --- a/src/backend/tcop/utility.c +++ b/src/backend/tcop/utility.c @@ -728,7 +728,7 @@ standard_ProcessUtility(PlannedStmt *pstmt, DoCopy(pstate, (CopyStmt *) parsetree, pstmt->stmt_location, pstmt->stmt_len, - &processed); + &processed, dest); if (completionTag) snprintf(completionTag, COMPLETION_TAG_BUFSIZE, "COPY " UINT64_FORMAT, processed); diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c index 67df0cd2c7..34869aaec6 100644 --- a/src/bin/psql/common.c +++ b/src/bin/psql/common.c @@ -892,6 +892,7 @@ ProcessResult(PGresult **results) { bool success = true; bool first_cycle = true; + bool is_copy_in = false; for (;;) { @@ -1015,6 +1016,7 @@ ProcessResult(PGresult **results) copystream, PQbinaryTuples(*results), ©_result) && success; + is_copy_in = true; } ResetCancelConn(); @@ -1045,6 +1047,11 @@ ProcessResult(PGresult **results) first_cycle = false; } + /* Print returned result for COPY FROM with error_limit. */ + if (is_copy_in && !success && PQresultStatus(*results) != + PGRES_FATAL_ERROR) + (void) PrintQueryTuples(*results); + SetResultVariables(*results, success); /* may need this to recover from conn loss during COPY */ diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index c639833565..addd8054d6 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -25,7 +25,7 @@ typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread); extern void DoCopy(ParseState *state, const CopyStmt *stmt, int stmt_location, int stmt_len, - uint64 *processed); + uint64 *processed, DestReceiver *dest); extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options); extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename, @@ -37,7 +37,7 @@ extern bool NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); -extern uint64 CopyFrom(CopyState cstate); +extern uint64 CopyFrom(CopyState cstate, DestReceiver *dest); extern DestReceiver *CreateCopyDestReceiver(void); diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index e40287d25a..773e965970 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -55,6 +55,15 @@ LINE 1: COPY x TO stdout WHERE a = 1; ^ COPY x from stdin WHERE a = 50004; COPY x from stdin WHERE a > 60003; +COPY x from stdin WITH(ERROR_LIMIT 5); +WARNING: skipping "70001 22 32" --- missing data for column "d" +WARNING: skipping "70002 23 33 43 53 54" --- extra data after last expected column +WARNING: skipping "70003 24 34 44" --- missing data for column "e" + + a | b | c | d | e +---+---+---+---+--- +(0 rows) + COPY x from stdin WHERE f > 60003; ERROR: column "f" does not exist LINE 1: COPY x from stdin WHERE f > 60003; @@ -102,12 +111,14 @@ SELECT * FROM x; 50004 | 25 | 35 | 45 | before trigger fired 60004 | 25 | 35 | 45 | before trigger fired 60005 | 26 | 36 | 46 | before trigger fired + 70004 | 25 | 35 | 45 | before trigger fired + 70005 | 26 | 36 | 46 | before trigger fired 1 | 1 | stuff | test_1 | after trigger fired 2 | 2 | stuff | test_2 | after trigger fired 3 | 3 | stuff | test_3 | after trigger fired 4 | 4 | stuff | test_4 | after trigger fired 5 | 5 | stuff | test_5 | after trigger fired -(28 rows) +(30 rows) -- check copy out COPY x TO stdout; @@ -134,6 +145,8 @@ COPY x TO stdout; 50004 25 35 45 before trigger fired 60004 25 35 45 before trigger fired 60005 26 36 46 before trigger fired +70004 25 35 45 before trigger fired +70005 26 36 46 before trigger fired 1 1 stuff test_1 after trigger fired 2 2 stuff test_2 after trigger fired 3 3 stuff test_3 after trigger fired @@ -163,6 +176,8 @@ Delimiter before trigger fired 35 before trigger fired 35 before trigger fired 36 before trigger fired +35 before trigger fired +36 before trigger fired stuff after trigger fired stuff after trigger fired stuff after trigger fired @@ -192,6 +207,8 @@ I'm null before trigger fired 25 before trigger fired 25 before trigger fired 26 before trigger fired +25 before trigger fired +26 before trigger fired 1 after trigger fired 2 after trigger fired 3 after trigger fired diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index 902f4fac19..2378f428fc 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -110,6 +110,14 @@ COPY x from stdin WHERE a > 60003; 60005 26 36 46 56 \. +COPY x from stdin WITH(ERROR_LIMIT 5); +70001 22 32 +70002 23 33 43 53 54 +70003 24 34 44 +70004 25 35 45 55 +70005 26 36 46 56 +\. + COPY x from stdin WHERE f > 60003; COPY x from stdin WHERE a = max(x.b);