>
From 09befdad45a6b1ae70d6c5abc90d1c2296e56ee1 Mon Sep 17 00:00:00 2001
From: Damir Belyalov <dam.be...@gmail.com>
Date: Fri, 15 Oct 2021 11:55:18 +0300
Subject: [PATCH] COPY_IGNORE_ERRORS with GUC for replay_buffer size

---
 doc/src/sgml/config.sgml                      |  17 ++
 doc/src/sgml/ref/copy.sgml                    |  19 ++
 src/backend/commands/copy.c                   |   8 +
 src/backend/commands/copyfrom.c               | 114 +++++++++++-
 src/backend/commands/copyfromparse.c          | 169 ++++++++++++++++++
 src/backend/parser/gram.y                     |   8 +-
 src/backend/utils/misc/guc.c                  |  11 ++
 src/backend/utils/misc/postgresql.conf.sample |   2 +
 src/bin/psql/tab-complete.c                   |   3 +-
 src/include/commands/copy.h                   |   6 +
 src/include/commands/copyfrom_internal.h      |  19 ++
 src/include/parser/kwlist.h                   |   1 +
 src/test/regress/expected/copy2.out           | 130 ++++++++++++++
 src/test/regress/sql/copy2.sql                | 116 ++++++++++++
 14 files changed, 617 insertions(+), 6 deletions(-)

diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml
index 37fd80388c..69373b8d8c 100644
--- a/doc/src/sgml/config.sgml
+++ b/doc/src/sgml/config.sgml
@@ -1961,6 +1961,23 @@ include_dir 'conf.d'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-logical-decoding-work-mem" xreflabel="replay_buffer_size">
+      <term><varname>replay_buffer_size</varname> (<type>integer</type>)
+      <indexterm>
+       <primary><varname>replay_buffer_size</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Specifies the size of buffer for COPY FROM IGNORE_ERRORS option. This buffer
+        is created when subtransaction begins and accumulates tuples until an error
+        occurs. Then it starts replaying stored tuples. the buffer size is the size
+        of the subtransaction. Therefore, on large tables, in order to avoid the
+        error of the maximum number of subtransactions, it should be increased.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-max-stack-depth" xreflabel="max_stack_depth">
       <term><varname>max_stack_depth</varname> (<type>integer</type>)
       <indexterm>
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 8aae711b3b..7ff6f6dea7 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
 
     FORMAT <replaceable class="parameter">format_name</replaceable>
     FREEZE [ <replaceable class="parameter">boolean</replaceable> ]
+    IGNORE_ERRORS [ <replaceable class="parameter">boolean</replaceable> ]
     DELIMITER '<replaceable class="parameter">delimiter_character</replaceable>'
     NULL '<replaceable class="parameter">null_string</replaceable>'
     HEADER [ <replaceable class="parameter">boolean</replaceable> | MATCH ]
@@ -233,6 +234,24 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>IGNORE_ERRORS</literal></term>
+    <listitem>
+     <para>
+      Drop rows that contain malformed data while copying. These are rows
+      containing syntax errors in data, rows with too many or too few columns,
+      rows that result in constraint violations, rows containing columns where
+      the data type's input function raises an error.
+      Outputs warnings about rows with incorrect data (the number of warnings
+      is not more than 100) and the total number of errors.
+      The option is implemented with subtransactions and a buffer created in
+      them, which accumulates tuples until an error occures.
+      The size of buffer is the size of subtransaction block.
+      It is a GUC parameter "replay_buffer_size" and equals 1000 by default.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>DELIMITER</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 3ac731803b..fead1aba46 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -402,6 +402,7 @@ ProcessCopyOptions(ParseState *pstate,
 {
 	bool		format_specified = false;
 	bool		freeze_specified = false;
+	bool		ignore_errors_specified = false;
 	bool		header_specified = false;
 	ListCell   *option;
 
@@ -442,6 +443,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+				errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index a976008b3d..7e997d15c6 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -73,6 +73,11 @@
 /* Trim the list of buffers back down to this number after flushing */
 #define MAX_PARTITION_BUFFERS	32
 
+/*
+ * GUC parameters
+ */
+int		replay_buffer_size;
+
 /* Stores multi-insert data related to a single relation in CopyFrom. */
 typedef struct CopyMultiInsertBuffer
 {
@@ -100,12 +105,13 @@ typedef struct CopyMultiInsertInfo
 	int			ti_options;		/* table insert options */
 } CopyMultiInsertInfo;
 
-
 /* non-export function prototypes */
 static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+static void safeExecConstraints(CopyFromState cstate, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, EState *estate);
+
 /*
  * error context callback for COPY FROM
  *
@@ -521,6 +527,61 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	miinfo->bufferedBytes += tuplen;
 }
 
+/*
+ * Ignore constraints if IGNORE_ERRORS is enabled
+ */
+static void
+safeExecConstraints(CopyFromState cstate, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, EState *estate)
+{
+	SafeCopyFromState *safecstate = cstate->safecstate;
+
+	safecstate->skip_row = false;
+
+	PG_TRY();
+		ExecConstraints(resultRelInfo, myslot, estate);
+	PG_CATCH();
+	{
+		ErrorData *errdata;
+		MemoryContext cxt;
+
+		cxt = MemoryContextSwitchTo(safecstate->oldcontext);
+		errdata = CopyErrorData();
+
+		RollbackAndReleaseCurrentSubTransaction();
+		CurrentResourceOwner = safecstate->oldowner;
+
+		switch (errdata->sqlerrcode)
+		{
+			/* Ignore Constraint Violation */
+			case ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION:
+			case ERRCODE_RESTRICT_VIOLATION:
+			case ERRCODE_NOT_NULL_VIOLATION:
+			case ERRCODE_FOREIGN_KEY_VIOLATION:
+			case ERRCODE_UNIQUE_VIOLATION:
+			case ERRCODE_CHECK_VIOLATION:
+			case ERRCODE_EXCLUSION_VIOLATION:
+				safecstate->errors++;
+				if (cstate->safecstate->errors <= 100)
+					ereport(WARNING,
+							(errcode(errdata->sqlerrcode),
+							errmsg("%s", errdata->context)));
+
+				safecstate->begin_subtransaction = true;
+				safecstate->skip_row = true;
+				break;
+			default:
+				PG_RE_THROW();
+		}
+
+		FlushErrorState();
+		FreeErrorData(errdata);
+		errdata = NULL;
+
+		MemoryContextSwitchTo(cxt);
+	}
+	PG_END_TRY();
+}
+
 /*
  * Copy FROM file to relation.
  */
@@ -535,6 +596,7 @@ CopyFrom(CopyFromState cstate)
 	ExprContext *econtext;
 	TupleTableSlot *singleslot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
+	ResourceOwner oldowner = CurrentResourceOwner;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
@@ -819,9 +881,30 @@ CopyFrom(CopyFromState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
+	/* Initialize safeCopyFromState for IGNORE_ERRORS option */
+	if (cstate->opts.ignore_errors)
+	{
+		cstate->safecstate = palloc(sizeof(SafeCopyFromState));
+
+		/* Create replay_buffer in oldcontext */
+		cstate->safecstate->replay_buffer = (HeapTuple *) palloc(replay_buffer_size * sizeof(HeapTuple));
+
+		cstate->safecstate->saved_tuples = 0;
+		cstate->safecstate->replayed_tuples = 0;
+		cstate->safecstate->errors = 0;
+		cstate->safecstate->replay_is_active = false;
+		cstate->safecstate->begin_subtransaction = true;
+		cstate->safecstate->processed_remaining_tuples = false;
+
+		cstate->safecstate->oldowner = oldowner;
+		cstate->safecstate->oldcontext = oldcontext;
+		cstate->safecstate->insertMethod = insertMethod;
+	}
+
 	for (;;)
 	{
 		TupleTableSlot *myslot;
+		bool		valid_row;
 		bool		skip_tuple;
 
 		CHECK_FOR_INTERRUPTS();
@@ -855,8 +938,21 @@ CopyFrom(CopyFromState cstate)
 
 		ExecClearTuple(myslot);
 
-		/* Directly store the values/nulls array in the slot */
-		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+		/*
+		 * NextCopyFrom() directly store the values/nulls array in the slot.
+		 * safeNextCopyFrom() ignores rows with errors if IGNORE_ERRORS is enabled.
+		 */
+		if (!cstate->safecstate)
+			valid_row = NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+		else
+		{
+			valid_row = safeNextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull);
+
+			if (cstate->safecstate->skip_row)
+				continue;
+		}
+
+		if (!valid_row)
 			break;
 
 		ExecStoreVirtualTuple(myslot);
@@ -1035,7 +1131,17 @@ CopyFrom(CopyFromState cstate)
 				 */
 				if (resultRelInfo->ri_FdwRoutine == NULL &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr)
-					ExecConstraints(resultRelInfo, myslot, estate);
+				{
+					if (cstate->opts.ignore_errors)
+					{
+						safeExecConstraints(cstate, resultRelInfo, myslot, estate);
+
+						if (cstate->safecstate->skip_row)
+							continue;
+					}
+					else
+						ExecConstraints(resultRelInfo, myslot, estate);
+				}
 
 				/*
 				 * Also check the tuple against the partition constraint, if
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 57813b3458..1aae27d80d 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -1026,6 +1026,175 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 	return true;
 }
 
+/*
+ * Analog of NextCopyFrom() but skips rows with errors while copying.
+ */
+bool
+safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+	SafeCopyFromState *safecstate = cstate->safecstate;
+	bool valid_row = true;
+
+	safecstate->skip_row = false;
+
+	PG_TRY();
+	{
+		if (!safecstate->replay_is_active)
+		{
+			if (safecstate->begin_subtransaction)
+			{
+				BeginInternalSubTransaction(NULL);
+				CurrentResourceOwner = safecstate->oldowner;
+
+				safecstate->begin_subtransaction = false;
+			}
+
+			if (safecstate->saved_tuples < replay_buffer_size)
+			{
+				valid_row = NextCopyFrom(cstate, econtext, values, nulls);
+				if (valid_row)
+				{
+					/* Fill replay_buffer in CopyFrom() oldcontext */
+					MemoryContext cxt = MemoryContextSwitchTo(safecstate->oldcontext);
+
+					safecstate->replay_buffer[safecstate->saved_tuples++] = heap_form_tuple(RelationGetDescr(cstate->rel), values, nulls);
+					MemoryContextSwitchTo(cxt);
+
+					safecstate->skip_row = true;
+				}
+				else if (!safecstate->processed_remaining_tuples)
+				{
+					ReleaseCurrentSubTransaction();
+					CurrentResourceOwner = safecstate->oldowner;
+
+					if (safecstate->replayed_tuples < safecstate->saved_tuples)
+					{
+						/* Prepare to replay remaining tuples if they exist */
+						safecstate->replay_is_active = true;
+						safecstate->processed_remaining_tuples = true;
+						safecstate->skip_row = true;
+						return true;
+					}
+				}
+			}
+			else
+			{
+				/* Buffer was filled, commit subtransaction and prepare to replay */
+				ReleaseCurrentSubTransaction();
+				CurrentResourceOwner = safecstate->oldowner;
+
+				safecstate->replay_is_active = true;
+				safecstate->begin_subtransaction = true;
+				safecstate->skip_row = true;
+			}
+		}
+		else
+		{
+			if (safecstate->replayed_tuples < safecstate->saved_tuples)
+				/* Replaying tuple */
+				heap_deform_tuple(safecstate->replay_buffer[safecstate->replayed_tuples++], RelationGetDescr(cstate->rel), values, nulls);
+			else
+			{
+				/* Clean up replay_buffer */
+				MemSet(safecstate->replay_buffer, 0, replay_buffer_size * sizeof(HeapTuple));
+				safecstate->saved_tuples = safecstate->replayed_tuples = 0;
+
+				safecstate->replay_is_active = false;
+				safecstate->skip_row = true;
+			}
+		}
+	}
+	PG_CATCH();
+	{
+		ErrorData *errdata;
+		MemoryContext cxt;
+
+		cxt = MemoryContextSwitchTo(safecstate->oldcontext);
+		errdata = CopyErrorData();
+
+		RollbackAndReleaseCurrentSubTransaction();
+		CurrentResourceOwner = safecstate->oldowner;
+
+		switch (errdata->sqlerrcode)
+		{
+			/* Ignore malformed data */
+			case ERRCODE_DATA_EXCEPTION:
+			case ERRCODE_ARRAY_ELEMENT_ERROR:
+			case ERRCODE_DATETIME_VALUE_OUT_OF_RANGE:
+			case ERRCODE_INTERVAL_FIELD_OVERFLOW:
+			case ERRCODE_INVALID_CHARACTER_VALUE_FOR_CAST:
+			case ERRCODE_INVALID_DATETIME_FORMAT:
+			case ERRCODE_INVALID_ESCAPE_CHARACTER:
+			case ERRCODE_INVALID_ESCAPE_SEQUENCE:
+			case ERRCODE_NONSTANDARD_USE_OF_ESCAPE_CHARACTER:
+			case ERRCODE_INVALID_PARAMETER_VALUE:
+			case ERRCODE_INVALID_TABLESAMPLE_ARGUMENT:
+			case ERRCODE_INVALID_TIME_ZONE_DISPLACEMENT_VALUE:
+			case ERRCODE_NULL_VALUE_NOT_ALLOWED:
+			case ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE:
+			case ERRCODE_SEQUENCE_GENERATOR_LIMIT_EXCEEDED:
+			case ERRCODE_STRING_DATA_LENGTH_MISMATCH:
+			case ERRCODE_STRING_DATA_RIGHT_TRUNCATION:
+			case ERRCODE_INVALID_TEXT_REPRESENTATION:
+			case ERRCODE_INVALID_BINARY_REPRESENTATION:
+			case ERRCODE_BAD_COPY_FILE_FORMAT:
+			case ERRCODE_UNTRANSLATABLE_CHARACTER:
+			case ERRCODE_DUPLICATE_JSON_OBJECT_KEY_VALUE:
+			case ERRCODE_INVALID_ARGUMENT_FOR_SQL_JSON_DATETIME_FUNCTION:
+			case ERRCODE_INVALID_JSON_TEXT:
+			case ERRCODE_INVALID_SQL_JSON_SUBSCRIPT:
+			case ERRCODE_MORE_THAN_ONE_SQL_JSON_ITEM:
+			case ERRCODE_NO_SQL_JSON_ITEM:
+			case ERRCODE_NON_NUMERIC_SQL_JSON_ITEM:
+			case ERRCODE_NON_UNIQUE_KEYS_IN_A_JSON_OBJECT:
+			case ERRCODE_SINGLETON_SQL_JSON_ITEM_REQUIRED:
+			case ERRCODE_SQL_JSON_ARRAY_NOT_FOUND:
+			case ERRCODE_SQL_JSON_MEMBER_NOT_FOUND:
+			case ERRCODE_SQL_JSON_NUMBER_NOT_FOUND:
+			case ERRCODE_SQL_JSON_OBJECT_NOT_FOUND:
+			case ERRCODE_TOO_MANY_JSON_ARRAY_ELEMENTS:
+			case ERRCODE_TOO_MANY_JSON_OBJECT_MEMBERS:
+			case ERRCODE_SQL_JSON_SCALAR_REQUIRED:
+			case ERRCODE_SQL_JSON_ITEM_CANNOT_BE_CAST_TO_TARGET_TYPE:
+				safecstate->errors++;
+				if (safecstate->errors <= 100)
+					ereport(WARNING,
+							(errcode(errdata->sqlerrcode),
+							errmsg("%s", errdata->context)));
+
+				safecstate->begin_subtransaction = true;
+				safecstate->skip_row = true;
+				break;
+			default:
+				PG_RE_THROW();
+		}
+
+		FlushErrorState();
+		FreeErrorData(errdata);
+		errdata = NULL;
+
+		MemoryContextSwitchTo(cxt);
+	}
+	PG_END_TRY();
+
+	if (!valid_row)
+	{
+		if (safecstate->errors == 0)
+			ereport(NOTICE,
+					errmsg("FIND %d ERRORS", safecstate->errors));
+		else if (safecstate->errors == 1)
+			ereport(WARNING,
+					errmsg("FIND %d ERROR", safecstate->errors));
+		else
+			ereport(WARNING,
+					errmsg("FIND %d ERRORS", safecstate->errors));
+
+		return false;
+	}
+
+	return true;
+}
+
 /*
  * Read the next input line and stash it in line_buf.
  *
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index df5ceea910..3bb7235b34 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -800,7 +800,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	HANDLER HAVING HEADER_P HOLD HOUR_P
 
-	IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE
+	IDENTITY_P IF_P IGNORE_ERRORS ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE
 	INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
 	INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
 	INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
@@ -3456,6 +3456,10 @@ copy_opt_item:
 				{
 					$$ = makeDefElem("freeze", (Node *) makeBoolean(true), @1);
 				}
+			| IGNORE_ERRORS
+				{
+					$$ = makeDefElem("ignore_errors", (Node *)makeInteger(true), @1);
+				}
 			| DELIMITER opt_as Sconst
 				{
 					$$ = makeDefElem("delimiter", (Node *) makeString($3), @1);
@@ -17814,6 +17818,7 @@ unreserved_keyword:
 			| HOUR_P
 			| IDENTITY_P
 			| IF_P
+			| IGNORE_ERRORS
 			| IMMEDIATE
 			| IMMUTABLE
 			| IMPLICIT_P
@@ -18393,6 +18398,7 @@ bare_label_keyword:
 			| HOLD
 			| IDENTITY_P
 			| IF_P
+			| IGNORE_ERRORS
 			| ILIKE
 			| IMMEDIATE
 			| IMMUTABLE
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 0328029d43..54209a4a3c 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -49,6 +49,7 @@
 #include "catalog/pg_parameter_acl.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/copy.h"
 #include "commands/prepare.h"
 #include "commands/tablespace.h"
 #include "commands/trigger.h"
@@ -2527,6 +2528,16 @@ static struct config_int ConfigureNamesInt[] =
 		NULL, NULL, NULL
 	},
 
+	{
+		{"replay_buffer_size", PGC_USERSET, RESOURCES_MEM,
+			gettext_noop("Sets the size of replay buffer for COPY FROM IGNORE_ERRORS option"),
+			NULL
+		},
+		&replay_buffer_size,
+		1000, 1, INT_MAX,
+		NULL, NULL, NULL
+	},
+
 	/*
 	 * We use the hopefully-safely-small value of 100kB as the compiled-in
 	 * default for max_stack_depth.  InitializeGUCOptions will increase it if
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index b4bc06e5f5..f4e777a0a3 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -155,6 +155,8 @@
 					#   mmap
 					# (change requires restart)
 #min_dynamic_shared_memory = 0MB	# (change requires restart)
+#replay_buffer_size = 1000		# min 1
+					# (change requires restart)
 
 # - Disk -
 
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index e572f585ef..feaf18b043 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2742,7 +2742,8 @@ psql_completion(const char *text, int start, int end)
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "("))
 		COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL",
 					  "HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE",
-					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING");
+					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING",
+					  "IGNORE_ERRORS");
 
 	/* Complete COPY <sth> FROM|TO filename WITH (FORMAT */
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(", "FORMAT"))
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index cb0096aeb6..fc9f559efe 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -19,6 +19,9 @@
 #include "parser/parse_node.h"
 #include "tcop/dest.h"
 
+/* User-settable GUC parameters */
+extern PGDLLIMPORT int replay_buffer_size;
+
 /*
  * Represents whether a header line should be present, and whether it must
  * match the actual names (which implies "true").
@@ -42,6 +45,7 @@ typedef struct CopyFormatOptions
 								 * -1 if not specified */
 	bool		binary;			/* binary format? */
 	bool		freeze;			/* freeze rows on loading? */
+	bool		ignore_errors;  /* ignore rows with errors */
 	bool		csv_mode;		/* Comma Separated Value format? */
 	CopyHeaderChoice header_line;	/* header line? */
 	char	   *null_print;		/* NULL marker string (server encoding!) */
@@ -78,6 +82,8 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 extern void EndCopyFrom(CopyFromState cstate);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 						 Datum *values, bool *nulls);
+extern bool safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext,
+							 Datum *values, bool *nulls);
 extern bool NextCopyFromRawFields(CopyFromState cstate,
 								  char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 3df1c5a97c..4227a7babd 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -16,6 +16,7 @@
 
 #include "commands/copy.h"
 #include "commands/trigger.h"
+#include "utils/resowner.h"
 
 /*
  * Represents the different source cases we need to worry about at
@@ -49,6 +50,23 @@ typedef enum CopyInsertMethod
 	CIM_MULTI_CONDITIONAL		/* use table_multi_insert only if valid */
 } CopyInsertMethod;
 
+ /* Struct that holding fields for COPY FROM IGNORE_ERRORS option. */
+typedef struct SafeCopyFromState
+{
+	HeapTuple	   *replay_buffer;		/* accumulates tuples for replaying them after an error */
+	int				saved_tuples;		/* # of tuples in replay_buffer */
+	int 			replayed_tuples;	/* # of tuples was replayed from buffer */
+	int				errors;				/* total # of errors */
+	bool			replay_is_active;	/* become active after an error */
+	bool			begin_subtransaction; /* if it's true, we can start subtransaction */
+	bool			processed_remaining_tuples;	/* for case of replaying last tuples */
+	bool			skip_row; 			/* if it's true, we should skip this row */
+
+	MemoryContext	oldcontext;			/* create repay_buffer in CopyFrom() context */
+	ResourceOwner	oldowner;			/* CopyFrom() resource owner */
+	CopyInsertMethod insertMethod;
+} SafeCopyFromState;
+
 /*
  * This struct contains all the state variables used throughout a COPY FROM
  * operation.
@@ -71,6 +89,7 @@ typedef struct CopyFromStateData
 	char	   *filename;		/* filename, or NULL for STDIN */
 	bool		is_program;		/* is 'filename' a program to popen? */
 	copy_data_source_cb data_source_cb; /* function for reading data */
+	SafeCopyFromState *safecstate; /* struct for ignore_errors option */
 
 	CopyFormatOptions opts;
 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index ae35f03251..2af11bd359 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -201,6 +201,7 @@ PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("ignore_errors", IGNORE_ERRORS, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("immediate", IMMEDIATE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("immutable", IMMUTABLE, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 5f3685e9ef..093c7958be 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -649,6 +649,136 @@ SELECT * FROM instead_of_insert_tbl;
 (2 rows)
 
 COMMIT;
+-- tests for IGNORE_ERRORS option
+-- CIM_MULTI case
+CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int);
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column n: "5 {5} 5555555555"
+WARNING:  COPY check_ign_err, line 6, column n: ""
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  COPY check_ign_err, line 8, column n: "8 {8} 8"
+WARNING:  FIND 7 ERRORS
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+-- CIM_SINGLE cases
+-- BEFORE row trigger
+TRUNCATE check_ign_err;
+CREATE TABLE trig_test(n int, m int[]);
+CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column n: "5 {5} 5555555555"
+WARNING:  COPY check_ign_err, line 6, column n: ""
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  COPY check_ign_err, line 8, column n: "8 {8} 8"
+WARNING:  FIND 7 ERRORS
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TRIGGER trig_before on check_ign_err;
+-- INSTEAD OF row trigger
+TRUNCATE check_ign_err;
+TRUNCATE trig_test;
+CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err;
+CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO check_ign_err VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of();
+COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS;
+WARNING:  COPY check_ign_err_view, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err_view, line 3: "3	{3}"
+WARNING:  COPY check_ign_err_view, line 4, column n: "a"
+WARNING:  COPY check_ign_err_view, line 5, column n: "5 {5} 5555555555"
+WARNING:  COPY check_ign_err_view, line 6, column n: ""
+WARNING:  COPY check_ign_err_view, line 7, column m: "{a, 7}"
+WARNING:  COPY check_ign_err_view, line 8, column n: "8 {8} 8"
+WARNING:  FIND 7 ERRORS
+SELECT * FROM check_ign_err_view;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TRIGGER trig_instead_of ON check_ign_err_view;
+DROP VIEW check_ign_err_view;
+-- foreign table case in postgres_fdw extension
+-- volatile function in WHERE clause
+TRUNCATE check_ign_err;
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS
+  WHERE n = floor(random()*(1-1+1))+1; /* find values equal 1 */
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column n: "5 {5} 5555555555"
+WARNING:  COPY check_ign_err, line 6, column n: ""
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  COPY check_ign_err, line 8, column n: "8 {8} 8"
+WARNING:  FIND 7 ERRORS
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TABLE check_ign_err;
+-- CIM_MULTI_CONDITIONAL case
+-- INSERT triggers for partition tables
+TRUNCATE trig_test;
+CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int)
+  PARTITION BY RANGE (k);
+CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err
+  FOR VALUES FROM (1) TO (4);
+CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err
+  FOR VALUES FROM (4) TO (8);
+CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column n: "5 {5} 5555555555"
+WARNING:  COPY check_ign_err, line 6, column n: ""
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  COPY check_ign_err, line 8, column n: "8 {8} 8"
+WARNING:  FIND 7 ERRORS
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TRIGGER trig_before_part on check_ign_err;
+DROP TABLE trig_test;
+DROP TABLE check_ign_err CASCADE;
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index b3c16af48e..c91122aa1e 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -454,6 +454,122 @@ test1
 SELECT * FROM instead_of_insert_tbl;
 COMMIT;
 
+-- tests for IGNORE_ERRORS option
+-- CIM_MULTI case
+CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int);
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5 {5} 5555555555
+
+7	{a, 7}	7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err;
+
+-- CIM_SINGLE cases
+-- BEFORE row trigger
+TRUNCATE check_ign_err;
+CREATE TABLE trig_test(n int, m int[]);
+CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5 {5} 5555555555
+
+7	{a, 7}	7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err;
+DROP TRIGGER trig_before on check_ign_err;
+
+-- INSTEAD OF row trigger
+TRUNCATE check_ign_err;
+TRUNCATE trig_test;
+CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err;
+CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO check_ign_err VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of();
+COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5 {5} 5555555555
+
+7	{a, 7}	7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err_view;
+DROP TRIGGER trig_instead_of ON check_ign_err_view;
+DROP VIEW check_ign_err_view;
+
+-- foreign table case in postgres_fdw extension
+
+-- volatile function in WHERE clause
+TRUNCATE check_ign_err;
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS
+  WHERE n = floor(random()*(1-1+1))+1; /* find values equal 1 */
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5 {5} 5555555555
+
+7	{a, 7}	7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err;
+DROP TABLE check_ign_err;
+
+-- CIM_MULTI_CONDITIONAL case
+-- INSERT triggers for partition tables
+TRUNCATE trig_test;
+CREATE TABLE check_ign_err (n int check (n < 8), m int[], k int)
+  PARTITION BY RANGE (k);
+CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err
+  FOR VALUES FROM (1) TO (4);
+CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err
+  FOR VALUES FROM (4) TO (8);
+CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5 {5} 5555555555
+
+7	{a, 7}	7
+8 {8} 8
+\.
+SELECT * FROM check_ign_err;
+DROP TRIGGER trig_before_part on check_ign_err;
+DROP TABLE trig_test;
+DROP TABLE check_ign_err CASCADE;
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
-- 
2.25.1

Reply via email to