>
From 09befdad45a6b1ae70d6c5abc90d1c2296e56ee1 Mon Sep 17 00:00:00 2001 From: Damir Belyalov <dam.be...@gmail.com> Date: Fri, 15 Oct 2021 11:55:18 +0300 Subject: [PATCH] COPY_IGNORE_ERRORS with GUC for replay_buffer size
--- doc/src/sgml/config.sgml | 17 ++ doc/src/sgml/ref/copy.sgml | 19 ++ src/backend/commands/copy.c | 8 + src/backend/commands/copyfrom.c | 114 +++++++++++- src/backend/commands/copyfromparse.c | 169 ++++++++++++++++++ src/backend/parser/gram.y | 8 +- src/backend/utils/misc/guc.c | 11 ++ src/backend/utils/misc/postgresql.conf.sample | 2 + src/bin/psql/tab-complete.c | 3 +- src/include/commands/copy.h | 6 + src/include/commands/copyfrom_internal.h | 19 ++ src/include/parser/kwlist.h | 1 + src/test/regress/expected/copy2.out | 130 ++++++++++++++ src/test/regress/sql/copy2.sql | 116 ++++++++++++ 14 files changed, 617 insertions(+), 6 deletions(-) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 37fd80388c..69373b8d8c 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -1961,6 +1961,23 @@ include_dir 'conf.d' </listitem> </varlistentry> + <varlistentry id="guc-logical-decoding-work-mem" xreflabel="replay_buffer_size"> + <term><varname>replay_buffer_size</varname> (<type>integer</type>) + <indexterm> + <primary><varname>replay_buffer_size</varname> configuration parameter</primary> + </indexterm> + </term> + <listitem> + <para> + Specifies the size of buffer for COPY FROM IGNORE_ERRORS option. This buffer + is created when subtransaction begins and accumulates tuples until an error + occurs. Then it starts replaying stored tuples. the buffer size is the size + of the subtransaction. Therefore, on large tables, in order to avoid the + error of the maximum number of subtransactions, it should be increased. + </para> + </listitem> + </varlistentry> + <varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth"> <term><varname>max_stack_depth</varname> (<type>integer</type>) <indexterm> diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml index 8aae711b3b..7ff6f6dea7 100644 --- a/doc/src/sgml/ref/copy.sgml +++ b/doc/src/sgml/ref/copy.sgml @@ -34,6 +34,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable FORMAT <replaceable class="parameter">format_name</replaceable> FREEZE [ <replaceable class="parameter">boolean</replaceable> ] + IGNORE_ERRORS [ <replaceable class="parameter">boolean</replaceable> ] DELIMITER '<replaceable class="parameter">delimiter_character</replaceable>' NULL '<replaceable class="parameter">null_string</replaceable>' HEADER [ <replaceable class="parameter">boolean</replaceable> | MATCH ] @@ -233,6 +234,24 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable </listitem> </varlistentry> + <varlistentry> + <term><literal>IGNORE_ERRORS</literal></term> + <listitem> + <para> + Drop rows that contain malformed data while copying. These are rows + containing syntax errors in data, rows with too many or too few columns, + rows that result in constraint violations, rows containing columns where + the data type's input function raises an error. + Outputs warnings about rows with incorrect data (the number of warnings + is not more than 100) and the total number of errors. + The option is implemented with subtransactions and a buffer created in + them, which accumulates tuples until an error occures. + The size of buffer is the size of subtransaction block. + It is a GUC parameter "replay_buffer_size" and equals 1000 by default. + </para> + </listitem> + </varlistentry> + <varlistentry> <term><literal>DELIMITER</literal></term> <listitem> diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 3ac731803b..fead1aba46 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -402,6 +402,7 @@ ProcessCopyOptions(ParseState *pstate, { bool format_specified = false; bool freeze_specified = false; + bool ignore_errors_specified = false; bool header_specified = false; ListCell *option; @@ -442,6 +443,13 @@ ProcessCopyOptions(ParseState *pstate, freeze_specified = true; opts_out->freeze = defGetBoolean(defel); } + else if (strcmp(defel->defname, "ignore_errors") == 0) + { + if (ignore_errors_specified) + errorConflictingDefElem(defel, pstate); + ignore_errors_specified = true; + opts_out->ignore_errors = defGetBoolean(defel); + } else if (strcmp(defel->defname, "delimiter") == 0) { if (opts_out->delim) diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index a976008b3d..7e997d15c6 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -73,6 +73,11 @@ /* Trim the list of buffers back down to this number after flushing */ #define MAX_PARTITION_BUFFERS 32 +/* + * GUC parameters + */ +int replay_buffer_size; + /* Stores multi-insert data related to a single relation in CopyFrom. */ typedef struct CopyMultiInsertBuffer { @@ -100,12 +105,13 @@ typedef struct CopyMultiInsertInfo int ti_options; /* table insert options */ } CopyMultiInsertInfo; - /* non-export function prototypes */ static char *limit_printout_length(const char *str); static void ClosePipeFromProgram(CopyFromState cstate); +static void safeExecConstraints(CopyFromState cstate, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, EState *estate); + /* * error context callback for COPY FROM * @@ -521,6 +527,61 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri, miinfo->bufferedBytes += tuplen; } +/* + * Ignore constraints if IGNORE_ERRORS is enabled + */ +static void +safeExecConstraints(CopyFromState cstate, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, EState *estate) +{ + SafeCopyFromState *safecstate = cstate->safecstate; + + safecstate->skip_row = false; + + PG_TRY(); + ExecConstraints(resultRelInfo, myslot, estate); + PG_CATCH(); + { + ErrorData *errdata; + MemoryContext cxt; + + cxt = MemoryContextSwitchTo(safecstate->oldcontext); + errdata = CopyErrorData(); + + RollbackAndReleaseCurrentSubTransaction(); + CurrentResourceOwner = safecstate->oldowner; + + switch (errdata->sqlerrcode) + { + /* Ignore Constraint Violation */ + case ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION: + case ERRCODE_RESTRICT_VIOLATION: + case ERRCODE_NOT_NULL_VIOLATION: + case ERRCODE_FOREIGN_KEY_VIOLATION: + case ERRCODE_UNIQUE_VIOLATION: + case ERRCODE_CHECK_VIOLATION: + case ERRCODE_EXCLUSION_VIOLATION: + safecstate->errors++; + if (cstate->safecstate->errors <= 100) + ereport(WARNING, + (errcode(errdata->sqlerrcode), + errmsg("%s", errdata->context))); + + safecstate->begin_subtransaction = true; + safecstate->skip_row = true; + break; + default: + PG_RE_THROW(); + } + + FlushErrorState(); + FreeErrorData(errdata); + errdata = NULL; + + MemoryContextSwitchTo(cxt); + } + PG_END_TRY(); +} + /* * Copy FROM file to relation. */ @@ -535,6 +596,7 @@ CopyFrom(CopyFromState cstate) ExprContext *econtext; TupleTableSlot *singleslot = NULL; MemoryContext oldcontext = CurrentMemoryContext; + ResourceOwner oldowner = CurrentResourceOwner; PartitionTupleRouting *proute = NULL; ErrorContextCallback errcallback; @@ -819,9 +881,30 @@ CopyFrom(CopyFromState cstate) errcallback.previous = error_context_stack; error_context_stack = &errcallback; + /* Initialize safeCopyFromState for IGNORE_ERRORS option */ + if (cstate->opts.ignore_errors) + { + cstate->safecstate = palloc(sizeof(SafeCopyFromState)); + + /* Create replay_buffer in oldcontext */ + cstate->safecstate->replay_buffer = (HeapTuple *) palloc(replay_buffer_size * sizeof(HeapTuple)); + + cstate->safecstate->saved_tuples = 0; + cstate->safecstate->replayed_tuples = 0; + cstate->safecstate->errors = 0; + cstate->safecstate->replay_is_active = false; + cstate->safecstate->begin_subtransaction = true; + cstate->safecstate->processed_remaining_tuples = false; + + cstate->safecstate->oldowner = oldowner; + cstate->safecstate->oldcontext = oldcontext; + cstate->safecstate->insertMethod = insertMethod; + } + for (;;) { TupleTableSlot *myslot; + bool valid_row; bool skip_tuple; CHECK_FOR_INTERRUPTS(); @@ -855,8 +938,21 @@ CopyFrom(CopyFromState cstate) ExecClearTuple(myslot); - /* Directly store the values/nulls array in the slot */ - if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) + /* + * NextCopyFrom() directly store the values/nulls array in the slot. + * safeNextCopyFrom() ignores rows with errors if IGNORE_ERRORS is enabled. + */ + if (!cstate->safecstate) + valid_row = NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull); + else + { + valid_row = safeNextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull); + + if (cstate->safecstate->skip_row) + continue; + } + + if (!valid_row) break; ExecStoreVirtualTuple(myslot); @@ -1035,7 +1131,17 @@ CopyFrom(CopyFromState cstate) */ if (resultRelInfo->ri_FdwRoutine == NULL && resultRelInfo->ri_RelationDesc->rd_att->constr) - ExecConstraints(resultRelInfo, myslot, estate); + { + if (cstate->opts.ignore_errors) + { + safeExecConstraints(cstate, resultRelInfo, myslot, estate); + + if (cstate->safecstate->skip_row) + continue; + } + else + ExecConstraints(resultRelInfo, myslot, estate); + } /* * Also check the tuple against the partition constraint, if diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index 57813b3458..1aae27d80d 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -1026,6 +1026,175 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, return true; } +/* + * Analog of NextCopyFrom() but skips rows with errors while copying. + */ +bool +safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls) +{ + SafeCopyFromState *safecstate = cstate->safecstate; + bool valid_row = true; + + safecstate->skip_row = false; + + PG_TRY(); + { + if (!safecstate->replay_is_active) + { + if (safecstate->begin_subtransaction) + { + BeginInternalSubTransaction(NULL); + CurrentResourceOwner = safecstate->oldowner; + + safecstate->begin_subtransaction = false; + } + + if (safecstate->saved_tuples < replay_buffer_size) + { + valid_row = NextCopyFrom(cstate, econtext, values, nulls); + if (valid_row) + { + /* Fill replay_buffer in CopyFrom() oldcontext */ + MemoryContext cxt = MemoryContextSwitchTo(safecstate->oldcontext); + + safecstate->replay_buffer[safecstate->saved_tuples++] = heap_form_tuple(RelationGetDescr(cstate->rel), values, nulls); + MemoryContextSwitchTo(cxt); + + safecstate->skip_row = true; + } + else if (!safecstate->processed_remaining_tuples) + { + ReleaseCurrentSubTransaction(); + CurrentResourceOwner = safecstate->oldowner; + + if (safecstate->replayed_tuples < safecstate->saved_tuples) + { + /* Prepare to replay remaining tuples if they exist */ + safecstate->replay_is_active = true; + safecstate->processed_remaining_tuples = true; + safecstate->skip_row = true; + return true; + } + } + } + else + { + /* Buffer was filled, commit subtransaction and prepare to replay */ + ReleaseCurrentSubTransaction(); + CurrentResourceOwner = safecstate->oldowner; + + safecstate->replay_is_active = true; + safecstate->begin_subtransaction = true; + safecstate->skip_row = true; + } + } + else + { + if (safecstate->replayed_tuples < safecstate->saved_tuples) + /* Replaying tuple */ + heap_deform_tuple(safecstate->replay_buffer[safecstate->replayed_tuples++], RelationGetDescr(cstate->rel), values, nulls); + else + { + /* Clean up replay_buffer */ + MemSet(safecstate->replay_buffer, 0, replay_buffer_size * sizeof(HeapTuple)); + safecstate->saved_tuples = safecstate->replayed_tuples = 0; + + safecstate->replay_is_active = false; + safecstate->skip_row = true; + } + } + } + PG_CATCH(); + { + ErrorData *errdata; + MemoryContext cxt; + + cxt = MemoryContextSwitchTo(safecstate->oldcontext); + errdata = CopyErrorData(); + + RollbackAndReleaseCurrentSubTransaction(); + CurrentResourceOwner = safecstate->oldowner; + + switch (errdata->sqlerrcode) + { + /* Ignore malformed data */ + case ERRCODE_DATA_EXCEPTION: + case ERRCODE_ARRAY_ELEMENT_ERROR: + case ERRCODE_DATETIME_VALUE_OUT_OF_RANGE: + case ERRCODE_INTERVAL_FIELD_OVERFLOW: + case ERRCODE_INVALID_CHARACTER_VALUE_FOR_CAST: + case ERRCODE_INVALID_DATETIME_FORMAT: + case ERRCODE_INVALID_ESCAPE_CHARACTER: + case ERRCODE_INVALID_ESCAPE_SEQUENCE: + case ERRCODE_NONSTANDARD_USE_OF_ESCAPE_CHARACTER: + case ERRCODE_INVALID_PARAMETER_VALUE: + case ERRCODE_INVALID_TABLESAMPLE_ARGUMENT: + case ERRCODE_INVALID_TIME_ZONE_DISPLACEMENT_VALUE: + case ERRCODE_NULL_VALUE_NOT_ALLOWED: + case ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE: + case ERRCODE_SEQUENCE_GENERATOR_LIMIT_EXCEEDED: + case ERRCODE_STRING_DATA_LENGTH_MISMATCH: + case ERRCODE_STRING_DATA_RIGHT_TRUNCATION: + case ERRCODE_INVALID_TEXT_REPRESENTATION: + case ERRCODE_INVALID_BINARY_REPRESENTATION: + case ERRCODE_BAD_COPY_FILE_FORMAT: + case ERRCODE_UNTRANSLATABLE_CHARACTER: + case ERRCODE_DUPLICATE_JSON_OBJECT_KEY_VALUE: + case ERRCODE_INVALID_ARGUMENT_FOR_SQL_JSON_DATETIME_FUNCTION: + case ERRCODE_INVALID_JSON_TEXT: + case ERRCODE_INVALID_SQL_JSON_SUBSCRIPT: + case ERRCODE_MORE_THAN_ONE_SQL_JSON_ITEM: + case ERRCODE_NO_SQL_JSON_ITEM: + case ERRCODE_NON_NUMERIC_SQL_JSON_ITEM: + case ERRCODE_NON_UNIQUE_KEYS_IN_A_JSON_OBJECT: + case ERRCODE_SINGLETON_SQL_JSON_ITEM_REQUIRED: + case ERRCODE_SQL_JSON_ARRAY_NOT_FOUND: + case ERRCODE_SQL_JSON_MEMBER_NOT_FOUND: + case ERRCODE_SQL_JSON_NUMBER_NOT_FOUND: + case ERRCODE_SQL_JSON_OBJECT_NOT_FOUND: + case ERRCODE_TOO_MANY_JSON_ARRAY_ELEMENTS: + case ERRCODE_TOO_MANY_JSON_OBJECT_MEMBERS: + case ERRCODE_SQL_JSON_SCALAR_REQUIRED: + case ERRCODE_SQL_JSON_ITEM_CANNOT_BE_CAST_TO_TARGET_TYPE: + safecstate->errors++; + if (safecstate->errors <= 100) + ereport(WARNING, + (errcode(errdata->sqlerrcode), + errmsg("%s", errdata->context))); + + safecstate->begin_subtransaction = true; + safecstate->skip_row = true; + break; + default: + PG_RE_THROW(); + } + + FlushErrorState(); + FreeErrorData(errdata); + errdata = NULL; + + MemoryContextSwitchTo(cxt); + } + PG_END_TRY(); + + if (!valid_row) + { + if (safecstate->errors == 0) + ereport(NOTICE, + errmsg("FIND %d ERRORS", safecstate->errors)); + else if (safecstate->errors == 1) + ereport(WARNING, + errmsg("FIND %d ERROR", safecstate->errors)); + else + ereport(WARNING, + errmsg("FIND %d ERRORS", safecstate->errors)); + + return false; + } + + return true; +} + /* * Read the next input line and stash it in line_buf. * diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y index df5ceea910..3bb7235b34 100644 --- a/src/backend/parser/gram.y +++ b/src/backend/parser/gram.y @@ -800,7 +800,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query); HANDLER HAVING HEADER_P HOLD HOUR_P - IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE + IDENTITY_P IF_P IGNORE_ERRORS ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION @@ -3456,6 +3456,10 @@ copy_opt_item: { $$ = makeDefElem("freeze", (Node *) makeBoolean(true), @1); } + | IGNORE_ERRORS + { + $$ = makeDefElem("ignore_errors", (Node *)makeInteger(true), @1); + } | DELIMITER opt_as Sconst { $$ = makeDefElem("delimiter", (Node *) makeString($3), @1); @@ -17814,6 +17818,7 @@ unreserved_keyword: | HOUR_P | IDENTITY_P | IF_P + | IGNORE_ERRORS | IMMEDIATE | IMMUTABLE | IMPLICIT_P @@ -18393,6 +18398,7 @@ bare_label_keyword: | HOLD | IDENTITY_P | IF_P + | IGNORE_ERRORS | ILIKE | IMMEDIATE | IMMUTABLE diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 0328029d43..54209a4a3c 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -49,6 +49,7 @@ #include "catalog/pg_parameter_acl.h" #include "catalog/storage.h" #include "commands/async.h" +#include "commands/copy.h" #include "commands/prepare.h" #include "commands/tablespace.h" #include "commands/trigger.h" @@ -2527,6 +2528,16 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, + { + {"replay_buffer_size", PGC_USERSET, RESOURCES_MEM, + gettext_noop("Sets the size of replay buffer for COPY FROM IGNORE_ERRORS option"), + NULL + }, + &replay_buffer_size, + 1000, 1, INT_MAX, + NULL, NULL, NULL + }, + /* * We use the hopefully-safely-small value of 100kB as the compiled-in * default for max_stack_depth. InitializeGUCOptions will increase it if diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index b4bc06e5f5..f4e777a0a3 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -155,6 +155,8 @@ # mmap # (change requires restart) #min_dynamic_shared_memory = 0MB # (change requires restart) +#replay_buffer_size = 1000 # min 1 + # (change requires restart) # - Disk - diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c index e572f585ef..feaf18b043 100644 --- a/src/bin/psql/tab-complete.c +++ b/src/bin/psql/tab-complete.c @@ -2742,7 +2742,8 @@ psql_completion(const char *text, int start, int end) else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(")) COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL", "HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE", - "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING"); + "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING", + "IGNORE_ERRORS"); /* Complete COPY <sth> FROM|TO filename WITH (FORMAT */ else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(", "FORMAT")) diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h index cb0096aeb6..fc9f559efe 100644 --- a/src/include/commands/copy.h +++ b/src/include/commands/copy.h @@ -19,6 +19,9 @@ #include "parser/parse_node.h" #include "tcop/dest.h" +/* User-settable GUC parameters */ +extern PGDLLIMPORT int replay_buffer_size; + /* * Represents whether a header line should be present, and whether it must * match the actual names (which implies "true"). @@ -42,6 +45,7 @@ typedef struct CopyFormatOptions * -1 if not specified */ bool binary; /* binary format? */ bool freeze; /* freeze rows on loading? */ + bool ignore_errors; /* ignore rows with errors */ bool csv_mode; /* Comma Separated Value format? */ CopyHeaderChoice header_line; /* header line? */ char *null_print; /* NULL marker string (server encoding!) */ @@ -78,6 +82,8 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where extern void EndCopyFrom(CopyFromState cstate); extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls); +extern bool safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls); extern bool NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields); extern void CopyFromErrorCallback(void *arg); diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h index 3df1c5a97c..4227a7babd 100644 --- a/src/include/commands/copyfrom_internal.h +++ b/src/include/commands/copyfrom_internal.h @@ -16,6 +16,7 @@ #include "commands/copy.h" #include "commands/trigger.h" +#include "utils/resowner.h" /* * Represents the different source cases we need to worry about at @@ -49,6 +50,23 @@ typedef enum CopyInsertMethod CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */ } CopyInsertMethod; + /* Struct that holding fields for COPY FROM IGNORE_ERRORS option. */ +typedef struct SafeCopyFromState +{ + HeapTuple *replay_buffer; /* accumulates tuples for replaying them after an error */ + int saved_tuples; /* # of tuples in replay_buffer */ + int replayed_tuples; /* # of tuples was replayed from buffer */ + int errors; /* total # of errors */ + bool replay_is_active; /* become active after an error */ + bool begin_subtransaction; /* if it's true, we can start subtransaction */ + bool processed_remaining_tuples; /* for case of replaying last tuples */ + bool skip_row; /* if it's true, we should skip this row */ + + MemoryContext oldcontext; /* create repay_buffer in CopyFrom() context */ + ResourceOwner oldowner; /* CopyFrom() resource owner */ + CopyInsertMethod insertMethod; +} SafeCopyFromState; + /* * This struct contains all the state variables used throughout a COPY FROM * operation. @@ -71,6 +89,7 @@ typedef struct CopyFromStateData char *filename; /* filename, or NULL for STDIN */ bool is_program; /* is 'filename' a program to popen? */ copy_data_source_cb data_source_cb; /* function for reading data */ + SafeCopyFromState *safecstate; /* struct for ignore_errors option */ CopyFormatOptions opts; bool *convert_select_flags; /* per-column CSV/TEXT CS flags */ diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h index ae35f03251..2af11bd359 100644 --- a/src/include/parser/kwlist.h +++ b/src/include/parser/kwlist.h @@ -201,6 +201,7 @@ PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD, AS_LABEL) PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD, BARE_LABEL) +PG_KEYWORD("ignore_errors", IGNORE_ERRORS, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL) PG_KEYWORD("immediate", IMMEDIATE, UNRESERVED_KEYWORD, BARE_LABEL) PG_KEYWORD("immutable", IMMUTABLE, UNRESERVED_KEYWORD, BARE_LABEL) diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out index 5f3685e9ef..093c7958be 100644 --- a/src/test/regress/expected/copy2.out +++ b/src/test/regress/expected/copy2.out @@ -649,6 +649,136 @@ SELECT * FROM instead_of_insert_tbl; (2 rows) COMMIT; +-- tests for IGNORE_ERRORS option +-- CIM_MULTI case +CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int); +COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS; +WARNING: COPY check_ign_err, line 2: "2 {2} 2 2" +WARNING: COPY check_ign_err, line 3: "3 {3}" +WARNING: COPY check_ign_err, line 4, column n: "a" +WARNING: COPY check_ign_err, line 5, column n: "5 {5} 5555555555" +WARNING: COPY check_ign_err, line 6, column n: "" +WARNING: COPY check_ign_err, line 7, column m: "{a, 7}" +WARNING: COPY check_ign_err, line 8, column n: "8 {8} 8" +WARNING: FIND 7 ERRORS +SELECT * FROM check_ign_err; + n | m | k +---+-----+--- + 1 | {1} | 1 +(1 row) + +-- CIM_SINGLE cases +-- BEFORE row trigger +TRUNCATE check_ign_err; +CREATE TABLE trig_test(n int, m int[]); +CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS ' + BEGIN + INSERT INTO trig_test VALUES(NEW.n, NEW.m); + RETURN NEW; + END; +' LANGUAGE plpgsql; +CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err +FOR EACH ROW EXECUTE PROCEDURE fn_trig_before(); +COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS; +WARNING: COPY check_ign_err, line 2: "2 {2} 2 2" +WARNING: COPY check_ign_err, line 3: "3 {3}" +WARNING: COPY check_ign_err, line 4, column n: "a" +WARNING: COPY check_ign_err, line 5, column n: "5 {5} 5555555555" +WARNING: COPY check_ign_err, line 6, column n: "" +WARNING: COPY check_ign_err, line 7, column m: "{a, 7}" +WARNING: COPY check_ign_err, line 8, column n: "8 {8} 8" +WARNING: FIND 7 ERRORS +SELECT * FROM check_ign_err; + n | m | k +---+-----+--- + 1 | {1} | 1 +(1 row) + +DROP TRIGGER trig_before on check_ign_err; +-- INSTEAD OF row trigger +TRUNCATE check_ign_err; +TRUNCATE trig_test; +CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err; +CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS ' + BEGIN + INSERT INTO check_ign_err VALUES(NEW.n, NEW.m, NEW.k); + RETURN NEW; + END; +' LANGUAGE plpgsql; +CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view +FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of(); +COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS; +WARNING: COPY check_ign_err_view, line 2: "2 {2} 2 2" +WARNING: COPY check_ign_err_view, line 3: "3 {3}" +WARNING: COPY check_ign_err_view, line 4, column n: "a" +WARNING: COPY check_ign_err_view, line 5, column n: "5 {5} 5555555555" +WARNING: COPY check_ign_err_view, line 6, column n: "" +WARNING: COPY check_ign_err_view, line 7, column m: "{a, 7}" +WARNING: COPY check_ign_err_view, line 8, column n: "8 {8} 8" +WARNING: FIND 7 ERRORS +SELECT * FROM check_ign_err_view; + n | m | k +---+-----+--- + 1 | {1} | 1 +(1 row) + +DROP TRIGGER trig_instead_of ON check_ign_err_view; +DROP VIEW check_ign_err_view; +-- foreign table case in postgres_fdw extension +-- volatile function in WHERE clause +TRUNCATE check_ign_err; +COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS + WHERE n = floor(random()*(1-1+1))+1; /* find values equal 1 */ +WARNING: COPY check_ign_err, line 2: "2 {2} 2 2" +WARNING: COPY check_ign_err, line 3: "3 {3}" +WARNING: COPY check_ign_err, line 4, column n: "a" +WARNING: COPY check_ign_err, line 5, column n: "5 {5} 5555555555" +WARNING: COPY check_ign_err, line 6, column n: "" +WARNING: COPY check_ign_err, line 7, column m: "{a, 7}" +WARNING: COPY check_ign_err, line 8, column n: "8 {8} 8" +WARNING: FIND 7 ERRORS +SELECT * FROM check_ign_err; + n | m | k +---+-----+--- + 1 | {1} | 1 +(1 row) + +DROP TABLE check_ign_err; +-- CIM_MULTI_CONDITIONAL case +-- INSERT triggers for partition tables +TRUNCATE trig_test; +CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int) + PARTITION BY RANGE (k); +CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err + FOR VALUES FROM (1) TO (4); +CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err + FOR VALUES FROM (4) TO (8); +CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS ' + BEGIN + INSERT INTO trig_test VALUES(NEW.n, NEW.m); + RETURN NEW; + END; +' LANGUAGE plpgsql; +CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err +FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part(); +COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS; +WARNING: COPY check_ign_err, line 2: "2 {2} 2 2" +WARNING: COPY check_ign_err, line 3: "3 {3}" +WARNING: COPY check_ign_err, line 4, column n: "a" +WARNING: COPY check_ign_err, line 5, column n: "5 {5} 5555555555" +WARNING: COPY check_ign_err, line 6, column n: "" +WARNING: COPY check_ign_err, line 7, column m: "{a, 7}" +WARNING: COPY check_ign_err, line 8, column n: "8 {8} 8" +WARNING: FIND 7 ERRORS +SELECT * FROM check_ign_err; + n | m | k +---+-----+--- + 1 | {1} | 1 +(1 row) + +DROP TRIGGER trig_before_part on check_ign_err; +DROP TABLE trig_test; +DROP TABLE check_ign_err CASCADE; -- clean up DROP TABLE forcetest; DROP TABLE vistest; diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql index b3c16af48e..c91122aa1e 100644 --- a/src/test/regress/sql/copy2.sql +++ b/src/test/regress/sql/copy2.sql @@ -454,6 +454,122 @@ test1 SELECT * FROM instead_of_insert_tbl; COMMIT; +-- tests for IGNORE_ERRORS option +-- CIM_MULTI case +CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int); +COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS; +1 {1} 1 +2 {2} 2 2 +3 {3} +a {4} 4 +5 {5} 5555555555 + +7 {a, 7} 7 +8 {8} 8 +\. +SELECT * FROM check_ign_err; + +-- CIM_SINGLE cases +-- BEFORE row trigger +TRUNCATE check_ign_err; +CREATE TABLE trig_test(n int, m int[]); +CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS ' + BEGIN + INSERT INTO trig_test VALUES(NEW.n, NEW.m); + RETURN NEW; + END; +' LANGUAGE plpgsql; +CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err +FOR EACH ROW EXECUTE PROCEDURE fn_trig_before(); +COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS; +1 {1} 1 +2 {2} 2 2 +3 {3} +a {4} 4 +5 {5} 5555555555 + +7 {a, 7} 7 +8 {8} 8 +\. +SELECT * FROM check_ign_err; +DROP TRIGGER trig_before on check_ign_err; + +-- INSTEAD OF row trigger +TRUNCATE check_ign_err; +TRUNCATE trig_test; +CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err; +CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS ' + BEGIN + INSERT INTO check_ign_err VALUES(NEW.n, NEW.m, NEW.k); + RETURN NEW; + END; +' LANGUAGE plpgsql; +CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view +FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of(); +COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS; +1 {1} 1 +2 {2} 2 2 +3 {3} +a {4} 4 +5 {5} 5555555555 + +7 {a, 7} 7 +8 {8} 8 +\. +SELECT * FROM check_ign_err_view; +DROP TRIGGER trig_instead_of ON check_ign_err_view; +DROP VIEW check_ign_err_view; + +-- foreign table case in postgres_fdw extension + +-- volatile function in WHERE clause +TRUNCATE check_ign_err; +COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS + WHERE n = floor(random()*(1-1+1))+1; /* find values equal 1 */ +1 {1} 1 +2 {2} 2 2 +3 {3} +a {4} 4 +5 {5} 5555555555 + +7 {a, 7} 7 +8 {8} 8 +\. +SELECT * FROM check_ign_err; +DROP TABLE check_ign_err; + +-- CIM_MULTI_CONDITIONAL case +-- INSERT triggers for partition tables +TRUNCATE trig_test; +CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int) + PARTITION BY RANGE (k); +CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err + FOR VALUES FROM (1) TO (4); +CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err + FOR VALUES FROM (4) TO (8); +CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS ' + BEGIN + INSERT INTO trig_test VALUES(NEW.n, NEW.m); + RETURN NEW; + END; +' LANGUAGE plpgsql; +CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err +FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part(); +COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS; +1 {1} 1 +2 {2} 2 2 +3 {3} +a {4} 4 +5 {5} 5555555555 + +7 {a, 7} 7 +8 {8} 8 +\. +SELECT * FROM check_ign_err; +DROP TRIGGER trig_before_part on check_ign_err; +DROP TABLE trig_test; +DROP TABLE check_ign_err CASCADE; + -- clean up DROP TABLE forcetest; DROP TABLE vistest; -- 2.25.1