>
From 09befdad45a6b1ae70d6c5abc90d1c2296e56ee1 Mon Sep 17 00:00:00 2001
From: Damir Belyalov <[email protected]>
Date: Fri, 15 Oct 2021 11:55:18 +0300
Subject: [PATCH] COPY_IGNORE_ERRORS with GUC for replay_buffer size
---
doc/src/sgml/config.sgml | 17 ++
doc/src/sgml/ref/copy.sgml | 19 ++
src/backend/commands/copy.c | 8 +
src/backend/commands/copyfrom.c | 114 +++++++++++-
src/backend/commands/copyfromparse.c | 169 ++++++++++++++++++
src/backend/parser/gram.y | 8 +-
src/backend/utils/misc/guc.c | 11 ++
src/backend/utils/misc/postgresql.conf.sample | 2 +
src/bin/psql/tab-complete.c | 3 +-
src/include/commands/copy.h | 6 +
src/include/commands/copyfrom_internal.h | 19 ++
src/include/parser/kwlist.h | 1 +
src/test/regress/expected/copy2.out | 130 ++++++++++++++
src/test/regress/sql/copy2.sql | 116 ++++++++++++
14 files changed, 617 insertions(+), 6 deletions(-)
diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 37fd80388c..69373b8d8c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1961,6 +1961,23 @@ include_dir 'conf.d'
</listitem>
</varlistentry>
+ <varlistentry id="guc-logical-decoding-work-mem" xreflabel="replay_buffer_size">
+ <term><varname>replay_buffer_size</varname> (<type>integer</type>)
+ <indexterm>
+ <primary><varname>replay_buffer_size</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Specifies the size of buffer for COPY FROM IGNORE_ERRORS option. This buffer
+ is created when subtransaction begins and accumulates tuples until an error
+ occurs. Then it starts replaying stored tuples. the buffer size is the size
+ of the subtransaction. Therefore, on large tables, in order to avoid the
+ error of the maximum number of subtransactions, it should be increased.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
<term><varname>max_stack_depth</varname> (<type>integer</type>)
<indexterm>
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 8aae711b3b..7ff6f6dea7 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
FORMAT <replaceable class="parameter">format_name</replaceable>
FREEZE [ <replaceable class="parameter">boolean</replaceable> ]
+ IGNORE_ERRORS [ <replaceable class="parameter">boolean</replaceable> ]
DELIMITER '<replaceable class="parameter">delimiter_character</replaceable>'
NULL '<replaceable class="parameter">null_string</replaceable>'
HEADER [ <replaceable class="parameter">boolean</replaceable> | MATCH ]
@@ -233,6 +234,24 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>IGNORE_ERRORS</literal></term>
+ <listitem>
+ <para>
+ Drop rows that contain malformed data while copying. These are rows
+ containing syntax errors in data, rows with too many or too few columns,
+ rows that result in constraint violations, rows containing columns where
+ the data type's input function raises an error.
+ Outputs warnings about rows with incorrect data (the number of warnings
+ is not more than 100) and the total number of errors.
+ The option is implemented with subtransactions and a buffer created in
+ them, which accumulates tuples until an error occures.
+ The size of buffer is the size of subtransaction block.
+ It is a GUC parameter "replay_buffer_size" and equals 1000 by default.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry>
<term><literal>DELIMITER</literal></term>
<listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3ac731803b..fead1aba46 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -402,6 +402,7 @@ ProcessCopyOptions(ParseState *pstate,
{
bool format_specified = false;
bool freeze_specified = false;
+ bool ignore_errors_specified = false;
bool header_specified = false;
ListCell *option;
@@ -442,6 +443,13 @@ ProcessCopyOptions(ParseState *pstate,
freeze_specified = true;
opts_out->freeze = defGetBoolean(defel);
}
+ else if (strcmp(defel->defname, "ignore_errors") == 0)
+ {
+ if (ignore_errors_specified)
+ errorConflictingDefElem(defel, pstate);
+ ignore_errors_specified = true;
+ opts_out->ignore_errors = defGetBoolean(defel);
+ }
else if (strcmp(defel->defname, "delimiter") == 0)
{
if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index a976008b3d..7e997d15c6 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -73,6 +73,11 @@
/* Trim the list of buffers back down to this number after flushing */
#define MAX_PARTITION_BUFFERS 32
+/*
+ * GUC parameters
+ */
+int replay_buffer_size;
+
/* Stores multi-insert data related to a single relation in CopyFrom. */
typedef struct CopyMultiInsertBuffer
{
@@ -100,12 +105,13 @@ typedef struct CopyMultiInsertInfo
int ti_options; /* table insert options */
} CopyMultiInsertInfo;
-
/* non-export function prototypes */
static char *limit_printout_length(const char *str);
static void ClosePipeFromProgram(CopyFromState cstate);
+static void safeExecConstraints(CopyFromState cstate, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, EState *estate);
+
/*
* error context callback for COPY FROM
*
@@ -521,6 +527,61 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
miinfo->bufferedBytes += tuplen;
}
+/*
+ * Ignore constraints if IGNORE_ERRORS is enabled
+ */
+static void
+safeExecConstraints(CopyFromState cstate, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, EState *estate)
+{
+ SafeCopyFromState *safecstate = cstate->safecstate;
+
+ safecstate->skip_row = false;
+
+ PG_TRY();
+ ExecConstraints(resultRelInfo, myslot, estate);
+ PG_CATCH();
+ {
+ ErrorData *errdata;
+ MemoryContext cxt;
+
+ cxt = MemoryContextSwitchTo(safecstate->oldcontext);
+ errdata = CopyErrorData();
+
+ RollbackAndReleaseCurrentSubTransaction();
+ CurrentResourceOwner = safecstate->oldowner;
+
+ switch (errdata->sqlerrcode)
+ {
+ /* Ignore Constraint Violation */
+ case ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION:
+ case ERRCODE_RESTRICT_VIOLATION:
+ case ERRCODE_NOT_NULL_VIOLATION:
+ case ERRCODE_FOREIGN_KEY_VIOLATION:
+ case ERRCODE_UNIQUE_VIOLATION:
+ case ERRCODE_CHECK_VIOLATION:
+ case ERRCODE_EXCLUSION_VIOLATION:
+ safecstate->errors++;
+ if (cstate->safecstate->errors <= 100)
+ ereport(WARNING,
+ (errcode(errdata->sqlerrcode),
+ errmsg("%s", errdata->context)));
+
+ safecstate->begin_subtransaction = true;
+ safecstate->skip_row = true;
+ break;
+ default:
+ PG_RE_THROW();
+ }
+
+ FlushErrorState();
+ FreeErrorData(errdata);
+ errdata = NULL;
+
+ MemoryContextSwitchTo(cxt);
+ }
+ PG_END_TRY();
+}
+
/*
* Copy FROM file to relation.
*/
@@ -535,6 +596,7 @@ CopyFrom(CopyFromState cstate)
ExprContext *econtext;
TupleTableSlot *singleslot = NULL;
MemoryContext oldcontext = CurrentMemoryContext;
+ ResourceOwner oldowner = CurrentResourceOwner;
PartitionTupleRouting *proute = NULL;
ErrorContextCallback errcallback;
@@ -819,9 +881,30 @@ CopyFrom(CopyFromState cstate)
errcallback.previous = error_context_stack;
error_context_stack = &errcallback;
+ /* Initialize safeCopyFromState for IGNORE_ERRORS option */
+ if (cstate->opts.ignore_errors)
+ {
+ cstate->safecstate = palloc(sizeof(SafeCopyFromState));
+
+ /* Create replay_buffer in oldcontext */
+ cstate->safecstate->replay_buffer = (HeapTuple *) palloc(replay_buffer_size * sizeof(HeapTuple));
+
+ cstate->safecstate->saved_tuples = 0;
+ cstate->safecstate->replayed_tuples = 0;
+ cstate->safecstate->errors = 0;
+ cstate->safecstate->replay_is_active = false;
+ cstate->safecstate->begin_subtransaction = true;
+ cstate->safecstate->processed_remaining_tuples = false;
+
+ cstate->safecstate->oldowner = oldowner;
+ cstate->safecstate->oldcontext = oldcontext;
+ cstate->safecstate->insertMethod = insertMethod;
+ }
+
for (;;)
{
TupleTableSlot *myslot;
+ bool valid_row;
bool skip_tuple;
CHECK_FOR_INTERRUPTS();
@@ -855,8 +938,21 @@ CopyFrom(CopyFromState cstate)
ExecClearTuple(myslot);
- /* Directly store the values/nulls array in the slot */
- if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+ /*
+ * NextCopyFrom() directly store the values/nulls array in the slot.
+ * safeNextCopyFrom() ignores rows with errors if IGNORE_ERRORS is enabled.
+ */
+ if (!cstate->safecstate)
+ valid_row = NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+ else
+ {
+ valid_row = safeNextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+
+ if (cstate->safecstate->skip_row)
+ continue;
+ }
+
+ if (!valid_row)
break;
ExecStoreVirtualTuple(myslot);
@@ -1035,7 +1131,17 @@ CopyFrom(CopyFromState cstate)
*/
if (resultRelInfo->ri_FdwRoutine == NULL &&
resultRelInfo->ri_RelationDesc->rd_att->constr)
- ExecConstraints(resultRelInfo, myslot, estate);
+ {
+ if (cstate->opts.ignore_errors)
+ {
+ safeExecConstraints(cstate, resultRelInfo, myslot, estate);
+
+ if (cstate->safecstate->skip_row)
+ continue;
+ }
+ else
+ ExecConstraints(resultRelInfo, myslot, estate);
+ }
/*
* Also check the tuple against the partition constraint, if
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 57813b3458..1aae27d80d 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -1026,6 +1026,175 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
return true;
}
+/*
+ * Analog of NextCopyFrom() but skips rows with errors while copying.
+ */
+bool
+safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+ SafeCopyFromState *safecstate = cstate->safecstate;
+ bool valid_row = true;
+
+ safecstate->skip_row = false;
+
+ PG_TRY();
+ {
+ if (!safecstate->replay_is_active)
+ {
+ if (safecstate->begin_subtransaction)
+ {
+ BeginInternalSubTransaction(NULL);
+ CurrentResourceOwner = safecstate->oldowner;
+
+ safecstate->begin_subtransaction = false;
+ }
+
+ if (safecstate->saved_tuples < replay_buffer_size)
+ {
+ valid_row = NextCopyFrom(cstate, econtext, values, nulls);
+ if (valid_row)
+ {
+ /* Fill replay_buffer in CopyFrom() oldcontext */
+ MemoryContext cxt = MemoryContextSwitchTo(safecstate->oldcontext);
+
+ safecstate->replay_buffer[safecstate->saved_tuples++] = heap_form_tuple(RelationGetDescr(cstate->rel), values, nulls);
+ MemoryContextSwitchTo(cxt);
+
+ safecstate->skip_row = true;
+ }
+ else if (!safecstate->processed_remaining_tuples)
+ {
+ ReleaseCurrentSubTransaction();
+ CurrentResourceOwner = safecstate->oldowner;
+
+ if (safecstate->replayed_tuples < safecstate->saved_tuples)
+ {
+ /* Prepare to replay remaining tuples if they exist */
+ safecstate->replay_is_active = true;
+ safecstate->processed_remaining_tuples = true;
+ safecstate->skip_row = true;
+ return true;
+ }
+ }
+ }
+ else
+ {
+ /* Buffer was filled, commit subtransaction and prepare to replay */
+ ReleaseCurrentSubTransaction();
+ CurrentResourceOwner = safecstate->oldowner;
+
+ safecstate->replay_is_active = true;
+ safecstate->begin_subtransaction = true;
+ safecstate->skip_row = true;
+ }
+ }
+ else
+ {
+ if (safecstate->replayed_tuples < safecstate->saved_tuples)
+ /* Replaying tuple */
+ heap_deform_tuple(safecstate->replay_buffer[safecstate->replayed_tuples++], RelationGetDescr(cstate->rel), values, nulls);
+ else
+ {
+ /* Clean up replay_buffer */
+ MemSet(safecstate->replay_buffer, 0, replay_buffer_size * sizeof(HeapTuple));
+ safecstate->saved_tuples = safecstate->replayed_tuples = 0;
+
+ safecstate->replay_is_active = false;
+ safecstate->skip_row = true;
+ }
+ }
+ }
+ PG_CATCH();
+ {
+ ErrorData *errdata;
+ MemoryContext cxt;
+
+ cxt = MemoryContextSwitchTo(safecstate->oldcontext);
+ errdata = CopyErrorData();
+
+ RollbackAndReleaseCurrentSubTransaction();
+ CurrentResourceOwner = safecstate->oldowner;
+
+ switch (errdata->sqlerrcode)
+ {
+ /* Ignore malformed data */
+ case ERRCODE_DATA_EXCEPTION:
+ case ERRCODE_ARRAY_ELEMENT_ERROR:
+ case ERRCODE_DATETIME_VALUE_OUT_OF_RANGE:
+ case ERRCODE_INTERVAL_FIELD_OVERFLOW:
+ case ERRCODE_INVALID_CHARACTER_VALUE_FOR_CAST:
+ case ERRCODE_INVALID_DATETIME_FORMAT:
+ case ERRCODE_INVALID_ESCAPE_CHARACTER:
+ case ERRCODE_INVALID_ESCAPE_SEQUENCE:
+ case ERRCODE_NONSTANDARD_USE_OF_ESCAPE_CHARACTER:
+ case ERRCODE_INVALID_PARAMETER_VALUE:
+ case ERRCODE_INVALID_TABLESAMPLE_ARGUMENT:
+ case ERRCODE_INVALID_TIME_ZONE_DISPLACEMENT_VALUE:
+ case ERRCODE_NULL_VALUE_NOT_ALLOWED:
+ case ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE:
+ case ERRCODE_SEQUENCE_GENERATOR_LIMIT_EXCEEDED:
+ case ERRCODE_STRING_DATA_LENGTH_MISMATCH:
+ case ERRCODE_STRING_DATA_RIGHT_TRUNCATION:
+ case ERRCODE_INVALID_TEXT_REPRESENTATION:
+ case ERRCODE_INVALID_BINARY_REPRESENTATION:
+ case ERRCODE_BAD_COPY_FILE_FORMAT:
+ case ERRCODE_UNTRANSLATABLE_CHARACTER:
+ case ERRCODE_DUPLICATE_JSON_OBJECT_KEY_VALUE:
+ case ERRCODE_INVALID_ARGUMENT_FOR_SQL_JSON_DATETIME_FUNCTION:
+ case ERRCODE_INVALID_JSON_TEXT:
+ case ERRCODE_INVALID_SQL_JSON_SUBSCRIPT:
+ case ERRCODE_MORE_THAN_ONE_SQL_JSON_ITEM:
+ case ERRCODE_NO_SQL_JSON_ITEM:
+ case ERRCODE_NON_NUMERIC_SQL_JSON_ITEM:
+ case ERRCODE_NON_UNIQUE_KEYS_IN_A_JSON_OBJECT:
+ case ERRCODE_SINGLETON_SQL_JSON_ITEM_REQUIRED:
+ case ERRCODE_SQL_JSON_ARRAY_NOT_FOUND:
+ case ERRCODE_SQL_JSON_MEMBER_NOT_FOUND:
+ case ERRCODE_SQL_JSON_NUMBER_NOT_FOUND:
+ case ERRCODE_SQL_JSON_OBJECT_NOT_FOUND:
+ case ERRCODE_TOO_MANY_JSON_ARRAY_ELEMENTS:
+ case ERRCODE_TOO_MANY_JSON_OBJECT_MEMBERS:
+ case ERRCODE_SQL_JSON_SCALAR_REQUIRED:
+ case ERRCODE_SQL_JSON_ITEM_CANNOT_BE_CAST_TO_TARGET_TYPE:
+ safecstate->errors++;
+ if (safecstate->errors <= 100)
+ ereport(WARNING,
+ (errcode(errdata->sqlerrcode),
+ errmsg("%s", errdata->context)));
+
+ safecstate->begin_subtransaction = true;
+ safecstate->skip_row = true;
+ break;
+ default:
+ PG_RE_THROW();
+ }
+
+ FlushErrorState();
+ FreeErrorData(errdata);
+ errdata = NULL;
+
+ MemoryContextSwitchTo(cxt);
+ }
+ PG_END_TRY();
+
+ if (!valid_row)
+ {
+ if (safecstate->errors == 0)
+ ereport(NOTICE,
+ errmsg("FIND %d ERRORS", safecstate->errors));
+ else if (safecstate->errors == 1)
+ ereport(WARNING,
+ errmsg("FIND %d ERROR", safecstate->errors));
+ else
+ ereport(WARNING,
+ errmsg("FIND %d ERRORS", safecstate->errors));
+
+ return false;
+ }
+
+ return true;
+}
+
/*
* Read the next input line and stash it in line_buf.
*
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index df5ceea910..3bb7235b34 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -800,7 +800,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
HANDLER HAVING HEADER_P HOLD HOUR_P
- IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE
+ IDENTITY_P IF_P IGNORE_ERRORS ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE
INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
@@ -3456,6 +3456,10 @@ copy_opt_item:
{
$$ = makeDefElem("freeze", (Node *) makeBoolean(true), @1);
}
+ | IGNORE_ERRORS
+ {
+ $$ = makeDefElem("ignore_errors", (Node *)makeInteger(true), @1);
+ }
| DELIMITER opt_as Sconst
{
$$ = makeDefElem("delimiter", (Node *) makeString($3), @1);
@@ -17814,6 +17818,7 @@ unreserved_keyword:
| HOUR_P
| IDENTITY_P
| IF_P
+ | IGNORE_ERRORS
| IMMEDIATE
| IMMUTABLE
| IMPLICIT_P
@@ -18393,6 +18398,7 @@ bare_label_keyword:
| HOLD
| IDENTITY_P
| IF_P
+ | IGNORE_ERRORS
| ILIKE
| IMMEDIATE
| IMMUTABLE
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0328029d43..54209a4a3c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -49,6 +49,7 @@
#include "catalog/pg_parameter_acl.h"
#include "catalog/storage.h"
#include "commands/async.h"
+#include "commands/copy.h"
#include "commands/prepare.h"
#include "commands/tablespace.h"
#include "commands/trigger.h"
@@ -2527,6 +2528,16 @@ static struct config_int ConfigureNamesInt[] =
NULL, NULL, NULL
},
+ {
+ {"replay_buffer_size", PGC_USERSET, RESOURCES_MEM,
+ gettext_noop("Sets the size of replay buffer for COPY FROM IGNORE_ERRORS option"),
+ NULL
+ },
+ &replay_buffer_size,
+ 1000, 1, INT_MAX,
+ NULL, NULL, NULL
+ },
+
/*
* We use the hopefully-safely-small value of 100kB as the compiled-in
* default for max_stack_depth. InitializeGUCOptions will increase it if
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b4bc06e5f5..f4e777a0a3 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -155,6 +155,8 @@
# mmap
# (change requires restart)
#min_dynamic_shared_memory = 0MB # (change requires restart)
+#replay_buffer_size = 1000 # min 1
+ # (change requires restart)
# - Disk -
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index e572f585ef..feaf18b043 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2742,7 +2742,8 @@ psql_completion(const char *text, int start, int end)
else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "("))
COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL",
"HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE",
- "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING");
+ "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING",
+ "IGNORE_ERRORS");
/* Complete COPY <sth> FROM|TO filename WITH (FORMAT */
else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(", "FORMAT"))
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index cb0096aeb6..fc9f559efe 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -19,6 +19,9 @@
#include "parser/parse_node.h"
#include "tcop/dest.h"
+/* User-settable GUC parameters */
+extern PGDLLIMPORT int replay_buffer_size;
+
/*
* Represents whether a header line should be present, and whether it must
* match the actual names (which implies "true").
@@ -42,6 +45,7 @@ typedef struct CopyFormatOptions
* -1 if not specified */
bool binary; /* binary format? */
bool freeze; /* freeze rows on loading? */
+ bool ignore_errors; /* ignore rows with errors */
bool csv_mode; /* Comma Separated Value format? */
CopyHeaderChoice header_line; /* header line? */
char *null_print; /* NULL marker string (server encoding!) */
@@ -78,6 +82,8 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
extern void EndCopyFrom(CopyFromState cstate);
extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
Datum *values, bool *nulls);
+extern bool safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext,
+ Datum *values, bool *nulls);
extern bool NextCopyFromRawFields(CopyFromState cstate,
char ***fields, int *nfields);
extern void CopyFromErrorCallback(void *arg);
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 3df1c5a97c..4227a7babd 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -16,6 +16,7 @@
#include "commands/copy.h"
#include "commands/trigger.h"
+#include "utils/resowner.h"
/*
* Represents the different source cases we need to worry about at
@@ -49,6 +50,23 @@ typedef enum CopyInsertMethod
CIM_MULTI_CONDITIONAL /* use table_multi_insert only if valid */
} CopyInsertMethod;
+ /* Struct that holding fields for COPY FROM IGNORE_ERRORS option. */
+typedef struct SafeCopyFromState
+{
+ HeapTuple *replay_buffer; /* accumulates tuples for replaying them after an error */
+ int saved_tuples; /* # of tuples in replay_buffer */
+ int replayed_tuples; /* # of tuples was replayed from buffer */
+ int errors; /* total # of errors */
+ bool replay_is_active; /* become active after an error */
+ bool begin_subtransaction; /* if it's true, we can start subtransaction */
+ bool processed_remaining_tuples; /* for case of replaying last tuples */
+ bool skip_row; /* if it's true, we should skip this row */
+
+ MemoryContext oldcontext; /* create repay_buffer in CopyFrom() context */
+ ResourceOwner oldowner; /* CopyFrom() resource owner */
+ CopyInsertMethod insertMethod;
+} SafeCopyFromState;
+
/*
* This struct contains all the state variables used throughout a COPY FROM
* operation.
@@ -71,6 +89,7 @@ typedef struct CopyFromStateData
char *filename; /* filename, or NULL for STDIN */
bool is_program; /* is 'filename' a program to popen? */
copy_data_source_cb data_source_cb; /* function for reading data */
+ SafeCopyFromState *safecstate; /* struct for ignore_errors option */
CopyFormatOptions opts;
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index ae35f03251..2af11bd359 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -201,6 +201,7 @@ PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD, AS_LABEL)
PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("ignore_errors", IGNORE_ERRORS, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL)
PG_KEYWORD("immediate", IMMEDIATE, UNRESERVED_KEYWORD, BARE_LABEL)
PG_KEYWORD("immutable", IMMUTABLE, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 5f3685e9ef..093c7958be 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -649,6 +649,136 @@ SELECT * FROM instead_of_insert_tbl;
(2 rows)
COMMIT;
+-- tests for IGNORE_ERRORS option
+-- CIM_MULTI case
+CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int);
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+WARNING: COPY check_ign_err, line 2: "2 {2} 2 2"
+WARNING: COPY check_ign_err, line 3: "3 {3}"
+WARNING: COPY check_ign_err, line 4, column n: "a"
+WARNING: COPY check_ign_err, line 5, column n: "5 {5} 5555555555"
+WARNING: COPY check_ign_err, line 6, column n: ""
+WARNING: COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING: COPY check_ign_err, line 8, column n: "8 {8} 8"
+WARNING: FIND 7 ERRORS
+SELECT * FROM check_ign_err;
+ n | m | k
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+-- CIM_SINGLE cases
+-- BEFORE row trigger
+TRUNCATE check_ign_err;
+CREATE TABLE trig_test(n int, m int[]);
+CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS '
+ BEGIN
+ INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+ RETURN NEW;
+ END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+WARNING: COPY check_ign_err, line 2: "2 {2} 2 2"
+WARNING: COPY check_ign_err, line 3: "3 {3}"
+WARNING: COPY check_ign_err, line 4, column n: "a"
+WARNING: COPY check_ign_err, line 5, column n: "5 {5} 5555555555"
+WARNING: COPY check_ign_err, line 6, column n: ""
+WARNING: COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING: COPY check_ign_err, line 8, column n: "8 {8} 8"
+WARNING: FIND 7 ERRORS
+SELECT * FROM check_ign_err;
+ n | m | k
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TRIGGER trig_before on check_ign_err;
+-- INSTEAD OF row trigger
+TRUNCATE check_ign_err;
+TRUNCATE trig_test;
+CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err;
+CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS '
+ BEGIN
+ INSERT INTO check_ign_err VALUES(NEW.n, NEW.m, NEW.k);
+ RETURN NEW;
+ END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of();
+COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS;
+WARNING: COPY check_ign_err_view, line 2: "2 {2} 2 2"
+WARNING: COPY check_ign_err_view, line 3: "3 {3}"
+WARNING: COPY check_ign_err_view, line 4, column n: "a"
+WARNING: COPY check_ign_err_view, line 5, column n: "5 {5} 5555555555"
+WARNING: COPY check_ign_err_view, line 6, column n: ""
+WARNING: COPY check_ign_err_view, line 7, column m: "{a, 7}"
+WARNING: COPY check_ign_err_view, line 8, column n: "8 {8} 8"
+WARNING: FIND 7 ERRORS
+SELECT * FROM check_ign_err_view;
+ n | m | k
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TRIGGER trig_instead_of ON check_ign_err_view;
+DROP VIEW check_ign_err_view;
+-- foreign table case in postgres_fdw extension
+-- volatile function in WHERE clause
+TRUNCATE check_ign_err;
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS
+ WHERE n = floor(random()*(1-1+1))+1; /* find values equal 1 */
+WARNING: COPY check_ign_err, line 2: "2 {2} 2 2"
+WARNING: COPY check_ign_err, line 3: "3 {3}"
+WARNING: COPY check_ign_err, line 4, column n: "a"
+WARNING: COPY check_ign_err, line 5, column n: "5 {5} 5555555555"
+WARNING: COPY check_ign_err, line 6, column n: ""
+WARNING: COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING: COPY check_ign_err, line 8, column n: "8 {8} 8"
+WARNING: FIND 7 ERRORS
+SELECT * FROM check_ign_err;
+ n | m | k
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TABLE check_ign_err;
+-- CIM_MULTI_CONDITIONAL case
+-- INSERT triggers for partition tables
+TRUNCATE trig_test;
+CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int)
+ PARTITION BY RANGE (k);
+CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err
+ FOR VALUES FROM (1) TO (4);
+CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err
+ FOR VALUES FROM (4) TO (8);
+CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS '
+ BEGIN
+ INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+ RETURN NEW;
+ END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+WARNING: COPY check_ign_err, line 2: "2 {2} 2 2"
+WARNING: COPY check_ign_err, line 3: "3 {3}"
+WARNING: COPY check_ign_err, line 4, column n: "a"
+WARNING: COPY check_ign_err, line 5, column n: "5 {5} 5555555555"
+WARNING: COPY check_ign_err, line 6, column n: ""
+WARNING: COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING: COPY check_ign_err, line 8, column n: "8 {8} 8"
+WARNING: FIND 7 ERRORS
+SELECT * FROM check_ign_err;
+ n | m | k
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TRIGGER trig_before_part on check_ign_err;
+DROP TABLE trig_test;
+DROP TABLE check_ign_err CASCADE;
-- clean up
DROP TABLE forcetest;
DROP TABLE vistest;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index b3c16af48e..c91122aa1e 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -454,6 +454,122 @@ test1
SELECT * FROM instead_of_insert_tbl;
COMMIT;
+-- tests for IGNORE_ERRORS option
+-- CIM_MULTI case
+CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int);
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+1 {1} 1
+2 {2} 2 2
+3 {3}
+a {4} 4
+5 {5} 5555555555
+
+7 {a, 7} 7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err;
+
+-- CIM_SINGLE cases
+-- BEFORE row trigger
+TRUNCATE check_ign_err;
+CREATE TABLE trig_test(n int, m int[]);
+CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS '
+ BEGIN
+ INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+ RETURN NEW;
+ END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+1 {1} 1
+2 {2} 2 2
+3 {3}
+a {4} 4
+5 {5} 5555555555
+
+7 {a, 7} 7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err;
+DROP TRIGGER trig_before on check_ign_err;
+
+-- INSTEAD OF row trigger
+TRUNCATE check_ign_err;
+TRUNCATE trig_test;
+CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err;
+CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS '
+ BEGIN
+ INSERT INTO check_ign_err VALUES(NEW.n, NEW.m, NEW.k);
+ RETURN NEW;
+ END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of();
+COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS;
+1 {1} 1
+2 {2} 2 2
+3 {3}
+a {4} 4
+5 {5} 5555555555
+
+7 {a, 7} 7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err_view;
+DROP TRIGGER trig_instead_of ON check_ign_err_view;
+DROP VIEW check_ign_err_view;
+
+-- foreign table case in postgres_fdw extension
+
+-- volatile function in WHERE clause
+TRUNCATE check_ign_err;
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS
+ WHERE n = floor(random()*(1-1+1))+1; /* find values equal 1 */
+1 {1} 1
+2 {2} 2 2
+3 {3}
+a {4} 4
+5 {5} 5555555555
+
+7 {a, 7} 7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err;
+DROP TABLE check_ign_err;
+
+-- CIM_MULTI_CONDITIONAL case
+-- INSERT triggers for partition tables
+TRUNCATE trig_test;
+CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int)
+ PARTITION BY RANGE (k);
+CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err
+ FOR VALUES FROM (1) TO (4);
+CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err
+ FOR VALUES FROM (4) TO (8);
+CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS '
+ BEGIN
+ INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+ RETURN NEW;
+ END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+1 {1} 1
+2 {2} 2 2
+3 {3}
+a {4} 4
+5 {5} 5555555555
+
+7 {a, 7} 7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err;
+DROP TRIGGER trig_before_part on check_ign_err;
+DROP TABLE trig_test;
+DROP TABLE check_ign_err CASCADE;
+
-- clean up
DROP TABLE forcetest;
DROP TABLE vistest;
--
2.25.1