Thank you for reviewing.
In the previous patch there was an error when processing constraints. The
patch was fixed, but the code grew up and became more complicated
(0005-COPY_IGNORE_ERRORS). I also simplified the logic of
safeNextCopyFrom().
You asked why we need subtransactions, so the answer is in this patch. When
processing a row that does not satisfy constraints or INSTEAD OF triggers,
it is necessary to rollback the subtransaction and return the table to its
original state.
Cause of complexity, I had to abandon the constraints, triggers processing
in and handle only errors that occur when reading the file. Attaching
simplified patch (0006-COPY_IGNORE_ERRORS).
Checked out these patches on all COPY regress tests and it worked correctly.


> BTW in v4 patch, data are loaded into the buffer one by one, and when
> the buffer fills up, the data in the buffer are 'replayed' also one by
> one, right?
> Wouldn't this have more overhead than a normal COPY?
>
The data is loaded into the buffer one by one, but in the "replay mode"
ignore_errors works as standard COPY. Tuples add to the slot and depending
on the type of the slot (CIM_SINGLE or CIM_MULTI) tuples are replayed in
the corresponding case.
For the 0006 patch you can imagine that we divide the loop for(;;) in 2
parts. The first part is adding tuples to the buffer and the second part is
inserting tuples to the table. These parts don't intersect with each other
and are completed sequentially.
The main idea of replay_buffer is that it is needed for the CIM_SINGLE
case. You can implement the CIM_SINGLE case and see that tuples before an
error occurring don't add to the table. Logic of the 0005 patch is similar
but with some differences.

As a test, I COPYed slightly larger data with and without ignore_errors
> option.
> There might be other reasons, but I found a performance difference.

Tried to reduce performance difference with cleaning up replay_buffer with
resetting the new context for replay_buffer - replay_cxt.
```
Before:
Without ignore_errors:
COPY 10000000
Time: 15538,579 ms (00:15,539)
With ignore_errors:
COPY 10000000
Time: 21289,121 ms (00:21,289)

After:
Without ignore_errors:
COPY 10000000
Time: 15318,922 ms (00:15,319)
With ignore_errors:
COPY 10000000
Time: 19868,175 ms (00:19,868)
```

 - Put in the documentation that the warnings will not be output for more
> than 101 cases.
>
Yeah, I point it out in the doc.


> I applied v4 patch and when canceled the COPY, there was a case I found
> myself left in a transaction.
> Should this situation be prevented from occurring?
>
> ```
> =# copy test from '/tmp/10000000.data' with (ignore_errors );
>
> ^CCancel request sent
> ERROR:  canceling statement due to user request
>
> =# truncate test;
> ERROR:  current transaction is aborted, commands ignored until end of
> transaction block
> ```
>
Tried to implement your error and could not. The result was the same as
COPY FROM implements.
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index c25b52d0cb..c99adabcc9 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
 
     FORMAT <replaceable class="parameter">format_name</replaceable>
     FREEZE [ <replaceable class="parameter">boolean</replaceable> ]
+    IGNORE_ERRORS [ <replaceable class="parameter">boolean</replaceable> ]
     DELIMITER '<replaceable class="parameter">delimiter_character</replaceable>'
     NULL '<replaceable class="parameter">null_string</replaceable>'
     HEADER [ <replaceable class="parameter">boolean</replaceable> | MATCH ]
@@ -233,6 +234,20 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>IGNORE_ERRORS</literal></term>
+    <listitem>
+     <para>
+      Drops rows that contain malformed data while copying. These are rows
+      containing syntax errors in data, rows with too many or too few columns,
+      rows that result in constraint violations, rows containing columns where
+      the data type's input function raises an error.
+      Outputs warnings about rows with incorrect data (the number of warnings
+      is not more than 100) and the total number of errors.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>DELIMITER</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..f41b25f26a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -406,6 +406,7 @@ ProcessCopyOptions(ParseState *pstate,
 				   bool is_from,
 				   List *options)
 {
+	bool		ignore_errors_specified = false;
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
@@ -448,6 +449,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+				errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index e8bb168aea..6474d47d29 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -106,6 +106,12 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+static bool safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext,
+							 Datum *values, bool *nulls);
+static bool safeExecConstraints(CopyFromState cstate, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, EState *estate);
+
+static void addToReplayBuffer(CopyFromState cstate, TupleTableSlot *myslot);
+
 /*
  * error context callback for COPY FROM
  *
@@ -521,6 +527,254 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
 	miinfo->bufferedBytes += tuplen;
 }
 
+/*
+ * Analog of NextCopyFrom() but ignores rows with errors while copying.
+ */
+bool
+safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+	SafeCopyFromState *sfcstate = cstate->sfcstate;
+
+	sfcstate->tuple_is_valid = false;
+
+	PG_TRY();
+	{
+		if (sfcstate->begin_subxact)
+		{
+			BeginInternalSubTransaction(NULL);
+			CurrentResourceOwner = sfcstate->oldowner;
+
+			sfcstate->begin_subxact = false;
+		}
+
+		if (!sfcstate->replay_is_active)
+		{
+			if (sfcstate->saved_tuples < REPLAY_BUFFER_SIZE)
+			{
+				MemoryContext cxt = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+				bool valid_row = NextCopyFrom(cstate, econtext, values, nulls);
+
+				CurrentMemoryContext = cxt;
+
+				sfcstate->tuple_is_valid = true;
+
+				if (!valid_row && sfcstate->replayed_tuples < sfcstate->saved_tuples)
+				{
+					/* Prepare for replaying remaining tuples if they exist */
+					sfcstate->replay_is_active = true;
+
+					/* If there are insteadof triggers we should rollback subtransaction */
+					if (sfcstate->has_instead_insert_row_trig)
+					{
+						RollbackAndReleaseCurrentSubTransaction();
+						CurrentResourceOwner = sfcstate->oldowner;
+
+						sfcstate->begin_subxact = true;
+					}
+				}
+				else if (!valid_row)
+				{
+					ReleaseCurrentSubTransaction();
+					CurrentResourceOwner = sfcstate->oldowner;
+
+					if (sfcstate->errors == 0)
+						ereport(NOTICE,
+								errmsg("%d errors", sfcstate->errors));
+					else if (sfcstate->errors == 1)
+						ereport(WARNING,
+								errmsg("%d error", sfcstate->errors));
+					else
+						ereport(WARNING,
+								errmsg("%d errors", sfcstate->errors));
+
+					return false;
+				}
+			}
+			else
+			{
+				/* Buffer was filled, prepare for replaying */
+				sfcstate->replay_is_active = true;
+
+				/* If there are insteadof triggers we should rollback subtransaction */
+				if (sfcstate->has_instead_insert_row_trig)
+				{
+					RollbackAndReleaseCurrentSubTransaction();
+					CurrentResourceOwner = sfcstate->oldowner;
+
+					sfcstate->begin_subxact = true;
+				}
+			}
+		}
+
+		if (sfcstate->replay_is_active)
+		{
+			if (sfcstate->replayed_tuples < sfcstate->saved_tuples)
+			{
+				/* Replaying the tuple */
+				MemoryContext cxt = MemoryContextSwitchTo(sfcstate->replay_cxt);
+
+				heap_deform_tuple(sfcstate->replay_buffer[sfcstate->replayed_tuples++], RelationGetDescr(cstate->rel), values, nulls);
+				MemoryContextSwitchTo(cxt);
+
+				sfcstate->tuple_is_valid = true;
+			}
+			else
+			{
+				/* Clean up replay_buffer */
+				MemoryContextReset(sfcstate->replay_cxt);
+				sfcstate->saved_tuples = sfcstate->replayed_tuples = 0;
+
+				cstate->sfcstate->replay_buffer = MemoryContextAlloc(cstate->sfcstate->replay_cxt,
+												  REPLAY_BUFFER_SIZE * sizeof(HeapTuple));
+
+				ReleaseCurrentSubTransaction();
+				CurrentResourceOwner = sfcstate->oldowner;
+
+				sfcstate->begin_subxact = true;
+				sfcstate->replay_is_active = false;
+			}
+		}
+	}
+	PG_CATCH();
+	{
+		MemoryContext ecxt = MemoryContextSwitchTo(sfcstate->oldcontext);
+		ErrorData *errdata = CopyErrorData();
+
+		RollbackAndReleaseCurrentSubTransaction();
+		CurrentResourceOwner = sfcstate->oldowner;
+
+		switch (errdata->sqlerrcode)
+		{
+			/* Ignore data exceptions */
+			case ERRCODE_CHARACTER_NOT_IN_REPERTOIRE:
+			case ERRCODE_DATA_EXCEPTION:
+			case ERRCODE_ARRAY_ELEMENT_ERROR:
+			case ERRCODE_DATETIME_VALUE_OUT_OF_RANGE:
+			case ERRCODE_INTERVAL_FIELD_OVERFLOW:
+			case ERRCODE_INVALID_CHARACTER_VALUE_FOR_CAST:
+			case ERRCODE_INVALID_DATETIME_FORMAT:
+			case ERRCODE_INVALID_ESCAPE_CHARACTER:
+			case ERRCODE_INVALID_ESCAPE_SEQUENCE:
+			case ERRCODE_NONSTANDARD_USE_OF_ESCAPE_CHARACTER:
+			case ERRCODE_INVALID_PARAMETER_VALUE:
+			case ERRCODE_INVALID_TABLESAMPLE_ARGUMENT:
+			case ERRCODE_INVALID_TIME_ZONE_DISPLACEMENT_VALUE:
+			case ERRCODE_NULL_VALUE_NOT_ALLOWED:
+			case ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE:
+			case ERRCODE_SEQUENCE_GENERATOR_LIMIT_EXCEEDED:
+			case ERRCODE_STRING_DATA_LENGTH_MISMATCH:
+			case ERRCODE_STRING_DATA_RIGHT_TRUNCATION:
+			case ERRCODE_INVALID_TEXT_REPRESENTATION:
+			case ERRCODE_INVALID_BINARY_REPRESENTATION:
+			case ERRCODE_BAD_COPY_FILE_FORMAT:
+			case ERRCODE_UNTRANSLATABLE_CHARACTER:
+			case ERRCODE_DUPLICATE_JSON_OBJECT_KEY_VALUE:
+			case ERRCODE_INVALID_ARGUMENT_FOR_SQL_JSON_DATETIME_FUNCTION:
+			case ERRCODE_INVALID_JSON_TEXT:
+			case ERRCODE_INVALID_SQL_JSON_SUBSCRIPT:
+			case ERRCODE_MORE_THAN_ONE_SQL_JSON_ITEM:
+			case ERRCODE_NO_SQL_JSON_ITEM:
+			case ERRCODE_NON_NUMERIC_SQL_JSON_ITEM:
+			case ERRCODE_NON_UNIQUE_KEYS_IN_A_JSON_OBJECT:
+			case ERRCODE_SINGLETON_SQL_JSON_ITEM_REQUIRED:
+			case ERRCODE_SQL_JSON_ARRAY_NOT_FOUND:
+			case ERRCODE_SQL_JSON_MEMBER_NOT_FOUND:
+			case ERRCODE_SQL_JSON_NUMBER_NOT_FOUND:
+			case ERRCODE_SQL_JSON_OBJECT_NOT_FOUND:
+			case ERRCODE_TOO_MANY_JSON_ARRAY_ELEMENTS:
+			case ERRCODE_TOO_MANY_JSON_OBJECT_MEMBERS:
+			case ERRCODE_SQL_JSON_SCALAR_REQUIRED:
+			case ERRCODE_SQL_JSON_ITEM_CANNOT_BE_CAST_TO_TARGET_TYPE:
+				sfcstate->errors++;
+				if (sfcstate->errors <= 100)
+					ereport(WARNING,
+							(errcode(errdata->sqlerrcode),
+							errmsg("%s", errdata->context)));
+
+				sfcstate->begin_subxact = true;
+
+				break;
+			default:
+				PG_RE_THROW();
+		}
+
+		FlushErrorState();
+		FreeErrorData(errdata);
+		errdata = NULL;
+
+		MemoryContextSwitchTo(ecxt);
+	}
+	PG_END_TRY();
+
+	return true;
+}
+
+/*
+ * Analog of ExecConstraints(), but ignores rows in constraint violations.
+ */
+bool
+safeExecConstraints(CopyFromState cstate, ResultRelInfo *resultRelInfo, TupleTableSlot *myslot, EState *estate)
+{
+	SafeCopyFromState *sfcstate = cstate->sfcstate;
+
+	PG_TRY();
+		ExecConstraints(resultRelInfo, myslot, estate);
+	PG_CATCH();
+	{
+		MemoryContext ecxt = MemoryContextSwitchTo(sfcstate->oldcontext);
+		ErrorData *errdata = CopyErrorData();
+
+		RollbackAndReleaseCurrentSubTransaction();
+		CurrentResourceOwner = sfcstate->oldowner;
+
+		switch (errdata->sqlerrcode)
+		{
+			/* Ignore constraint violations */
+			case ERRCODE_INTEGRITY_CONSTRAINT_VIOLATION:
+			case ERRCODE_RESTRICT_VIOLATION:
+			case ERRCODE_NOT_NULL_VIOLATION:
+			case ERRCODE_FOREIGN_KEY_VIOLATION:
+			case ERRCODE_UNIQUE_VIOLATION:
+			case ERRCODE_CHECK_VIOLATION:
+			case ERRCODE_EXCLUSION_VIOLATION:
+				sfcstate->errors++;
+				if (sfcstate->errors <= 100)
+					ereport(WARNING,
+							(errcode(errdata->sqlerrcode),
+							errmsg("%s %s", errdata->message, errdata->detail)));
+
+				sfcstate->begin_subxact = true;
+				sfcstate->tuple_is_valid = false;
+
+				break;
+			default:
+				PG_RE_THROW();
+		}
+
+		FlushErrorState();
+		FreeErrorData(errdata);
+		errdata = NULL;
+
+		MemoryContextSwitchTo(ecxt);
+	}
+	PG_END_TRY();
+
+	if (!sfcstate->tuple_is_valid)
+		return false;
+
+	return true;
+}
+
+void
+addToReplayBuffer(CopyFromState cstate, TupleTableSlot *myslot)
+{
+	MemoryContext cxt = MemoryContextSwitchTo(cstate->sfcstate->replay_cxt);
+	HeapTuple saved_tuple = heap_form_tuple(RelationGetDescr(cstate->rel), myslot->tts_values, myslot->tts_isnull);
+
+	cstate->sfcstate->replay_buffer[cstate->sfcstate->saved_tuples++] = saved_tuple;
+	MemoryContextSwitchTo(cxt);
+}
+
 /*
  * Copy FROM file to relation.
  */
@@ -535,6 +789,7 @@ CopyFrom(CopyFromState cstate)
 	ExprContext *econtext;
 	TupleTableSlot *singleslot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
+	ResourceOwner oldowner = CurrentResourceOwner;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
@@ -819,6 +1074,27 @@ CopyFrom(CopyFromState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
+	/* Initialize safeCopyFromState for IGNORE_ERRORS option */
+	if (cstate->opts.ignore_errors)
+	{
+		cstate->sfcstate = palloc(sizeof(SafeCopyFromState));
+
+		cstate->sfcstate->replay_cxt = AllocSetContextCreate(oldcontext,
+									   "Replay context",
+									   ALLOCSET_DEFAULT_SIZES);
+		cstate->sfcstate->replay_buffer = MemoryContextAlloc(cstate->sfcstate->replay_cxt,
+						   REPLAY_BUFFER_SIZE * sizeof(HeapTuple));
+		cstate->sfcstate->saved_tuples = 0;
+		cstate->sfcstate->replayed_tuples = 0;
+		cstate->sfcstate->errors = 0;
+		cstate->sfcstate->replay_is_active = false;
+		cstate->sfcstate->begin_subxact = true;
+		cstate->sfcstate->oldowner = oldowner;
+		cstate->sfcstate->oldcontext = oldcontext;
+		if (has_instead_insert_row_trig)
+			cstate->sfcstate->has_instead_insert_row_trig = true;
+	}
+
 	for (;;)
 	{
 		TupleTableSlot *myslot;
@@ -855,19 +1131,25 @@ CopyFrom(CopyFromState cstate)
 
 		ExecClearTuple(myslot);
 
-		/* Directly store the values/nulls array in the slot */
-		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
-			break;
+		if (cstate->sfcstate)
+		{
+			/* If option IGNORE_ERRORS is enabled, COPY skips rows with errors */
+			if (!safeNextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+				break;
+			if (!cstate->sfcstate->tuple_is_valid)
+				continue;
+		}
+		else
+		{
+			/* Directly store the values/nulls array in the slot */
+			if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+				break;
+		}
 
 		ExecStoreVirtualTuple(myslot);
 
-		/*
-		 * Constraints and where clause might reference the tableoid column,
-		 * so (re-)initialize tts_tableOid before evaluating them.
-		 */
 		myslot->tts_tableOid = RelationGetRelid(target_resultRelInfo->ri_RelationDesc);
 
-		/* Triggers and stuff need to be invoked in query context. */
 		MemoryContextSwitchTo(oldcontext);
 
 		if (cstate->whereClause)
@@ -1020,6 +1302,13 @@ CopyFrom(CopyFromState cstate)
 			if (has_instead_insert_row_trig)
 			{
 				ExecIRInsertTriggers(estate, resultRelInfo, myslot);
+
+				/* Add tuple to replay_buffer if IGNORE_ERRORS is enabled */
+				if (cstate->sfcstate && !cstate->sfcstate->replay_is_active)
+				{
+					addToReplayBuffer(cstate, myslot);
+					continue;
+				}
 			}
 			else
 			{
@@ -1035,7 +1324,15 @@ CopyFrom(CopyFromState cstate)
 				 */
 				if (resultRelInfo->ri_FdwRoutine == NULL &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr)
-					ExecConstraints(resultRelInfo, myslot, estate);
+				{
+					if (cstate->sfcstate)
+					{
+						if (!safeExecConstraints(cstate, resultRelInfo, myslot, estate))
+							continue;
+					}
+					else
+						ExecConstraints(resultRelInfo, myslot, estate);
+				}
 
 				/*
 				 * Also check the tuple against the partition constraint, if
@@ -1047,6 +1344,13 @@ CopyFrom(CopyFromState cstate)
 					(proute == NULL || has_before_insert_row_trig))
 					ExecPartitionCheck(resultRelInfo, myslot, estate, true);
 
+				/* Add tuple to replay_buffer if IGNORE_ERRORS is enabled */
+				if (cstate->sfcstate && !cstate->sfcstate->replay_is_active)
+				{
+					addToReplayBuffer(cstate, myslot);
+					continue;
+				}
+
 				/* Store the slot in the multi-insert buffer, when enabled. */
 				if (insertMethod == CIM_MULTI || leafpart_use_multi_insert)
 				{
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b5ab9d9c9a..b49954c0aa 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -808,7 +808,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	HANDLER HAVING HEADER_P HOLD HOUR_P
 
-	IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE
+	IDENTITY_P IF_P IGNORE_ERRORS ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE
 	INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
 	INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
 	INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
@@ -3477,6 +3477,10 @@ copy_opt_item:
 				{
 					$$ = makeDefElem("freeze", (Node *) makeBoolean(true), @1);
 				}
+			| IGNORE_ERRORS
+				{
+					$$ = makeDefElem("ignore_errors", (Node *)makeBoolean(true), @1);
+				}
 			| DELIMITER opt_as Sconst
 				{
 					$$ = makeDefElem("delimiter", (Node *) makeString($3), @1);
@@ -17778,6 +17782,7 @@ unreserved_keyword:
 			| HOUR_P
 			| IDENTITY_P
 			| IF_P
+			| IGNORE_ERRORS
 			| IMMEDIATE
 			| IMMUTABLE
 			| IMPLICIT_P
@@ -18357,6 +18362,7 @@ bare_label_keyword:
 			| HOLD
 			| IDENTITY_P
 			| IF_P
+			| IGNORE_ERRORS
 			| ILIKE
 			| IMMEDIATE
 			| IMMUTABLE
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 62a39779b9..fe590ff7a8 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2748,7 +2748,8 @@ psql_completion(const char *text, int start, int end)
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "("))
 		COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL",
 					  "HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE",
-					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING");
+					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING",
+					  "IGNORE_ERRORS");
 
 	/* Complete COPY <sth> FROM|TO filename WITH (FORMAT */
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(", "FORMAT"))
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index cb0096aeb6..2b696f99bc 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -42,6 +42,7 @@ typedef struct CopyFormatOptions
 								 * -1 if not specified */
 	bool		binary;			/* binary format? */
 	bool		freeze;			/* freeze rows on loading? */
+	bool		ignore_errors;  /* ignore rows with errors */
 	bool		csv_mode;		/* Comma Separated Value format? */
 	CopyHeaderChoice header_line;	/* header line? */
 	char	   *null_print;		/* NULL marker string (server encoding!) */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index e37c6032ae..5615fa55ef 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -16,6 +16,7 @@
 
 #include "commands/copy.h"
 #include "commands/trigger.h"
+#include "utils/resowner.h"
 
 /*
  * Represents the different source cases we need to worry about at
@@ -49,6 +50,24 @@ typedef enum CopyInsertMethod
 	CIM_MULTI_CONDITIONAL		/* use table_multi_insert only if valid */
 } CopyInsertMethod;
 
+/* Struct that holding fields for ignore_errors option. */
+typedef struct SafeCopyFromState
+{
+#define			REPLAY_BUFFER_SIZE 10
+	HeapTuple	   *replay_buffer; 			/* accumulates tuples for replaying it after an error */
+	int				saved_tuples;			/* # of tuples in replay_buffer */
+	int 			replayed_tuples;		/* # of tuples was replayed from buffer */
+	int				errors;					/* total # of errors */
+	bool			replay_is_active;		/* if true we replay tuples from buffer */
+	bool			begin_subxact;			/* if true we can begin subtransaction */
+	bool			tuple_is_valid;
+	bool			has_instead_insert_row_trig;
+
+	MemoryContext	replay_cxt;
+	MemoryContext	oldcontext;
+	ResourceOwner	oldowner;
+} SafeCopyFromState;
+
 /*
  * This struct contains all the state variables used throughout a COPY FROM
  * operation.
@@ -71,6 +90,7 @@ typedef struct CopyFromStateData
 	char	   *filename;		/* filename, or NULL for STDIN */
 	bool		is_program;		/* is 'filename' a program to popen? */
 	copy_data_source_cb data_source_cb; /* function for reading data */
+	SafeCopyFromState *sfcstate; /* struct for ignore_errors option */
 
 	CopyFormatOptions opts;
 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index ae35f03251..2af11bd359 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -201,6 +201,7 @@ PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("ignore_errors", IGNORE_ERRORS, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("immediate", IMMEDIATE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("immutable", IMMUTABLE, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 5f3685e9ef..d74575fd40 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -649,6 +649,134 @@ SELECT * FROM instead_of_insert_tbl;
 (2 rows)
 
 COMMIT;
+-- tests for IGNORE_ERRORS option
+-- CIM_MULTI case
+CREATE TABLE check_ign_err (n int check (n != 6), m int[], k int);
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column k: "5555555555"
+WARNING:  new row for relation "check_ign_err" violates check constraint "check_ign_err_n_check" Failing row contains (6, {6}, 6).
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  6 errors
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+ 8 | {8} | 8
+(2 rows)
+
+-- CIM_SINGLE cases
+-- BEFORE row trigger
+TRUNCATE check_ign_err;
+CREATE TABLE trig_test(n int, m int[], k int);
+CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err
+  FOR EACH ROW EXECUTE PROCEDURE fn_trig_before();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column k: "5555555555"
+WARNING:  new row for relation "check_ign_err" violates check constraint "check_ign_err_n_check" Failing row contains (6, {6}, 6).
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  6 errors
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+ 8 | {8} | 8
+(2 rows)
+
+DROP TRIGGER trig_before on check_ign_err;
+-- INSTEAD OF row trigger
+TRUNCATE check_ign_err;
+TRUNCATE trig_test;
+CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err;
+CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view
+  FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of();
+COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+WARNING:  COPY check_ign_err_view, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err_view, line 3: "3	{3}"
+WARNING:  COPY check_ign_err_view, line 4, column n: "a"
+WARNING:  COPY check_ign_err_view, line 5, column k: "5555555555"
+WARNING:  COPY check_ign_err_view, line 7, column m: "{a, 7}"
+WARNING:  5 errors
+SELECT * FROM trig_test;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+ 6 | {6} | 6
+ 8 | {8} | 8
+(3 rows)
+
+DROP TRIGGER trig_instead_of ON check_ign_err_view;
+DROP VIEW check_ign_err_view;
+-- foreign table case is in postgres_fdw extension
+-- volatile function in WHERE clause
+TRUNCATE check_ign_err;
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS
+  WHERE n = floor(random()*(1-1+1))+1; /* finds values equal 1 */
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column k: "5555555555"
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  5 errors
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TABLE check_ign_err;
+-- CIM_MULTI_CONDITIONAL case
+-- INSERT triggers for partition tables
+TRUNCATE trig_test;
+CREATE TABLE check_ign_err (n int check (n != 6), m int[], k int)
+  PARTITION BY RANGE (k);
+CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err
+  FOR VALUES FROM (1) TO (4);
+CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err
+  FOR VALUES FROM (4) TO (9);
+CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err
+  FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column k: "5555555555"
+WARNING:  new row for relation "check_ign_err_part2" violates check constraint "check_ign_err_n_check" Failing row contains (6, {6}, 6).
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  6 errors
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+ 8 | {8} | 8
+(2 rows)
+
+DROP TRIGGER trig_before_part on check_ign_err;
+DROP TABLE trig_test;
+DROP TABLE check_ign_err CASCADE;
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index b3c16af48e..8d29ceba26 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -454,6 +454,122 @@ test1
 SELECT * FROM instead_of_insert_tbl;
 COMMIT;
 
+-- tests for IGNORE_ERRORS option
+-- CIM_MULTI case
+CREATE TABLE check_ign_err (n int check (n != 6), m int[], k int);
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+6	{6}	6
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM check_ign_err;
+
+-- CIM_SINGLE cases
+-- BEFORE row trigger
+TRUNCATE check_ign_err;
+CREATE TABLE trig_test(n int, m int[], k int);
+CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err
+  FOR EACH ROW EXECUTE PROCEDURE fn_trig_before();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+6	{6}	6
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM check_ign_err;
+DROP TRIGGER trig_before on check_ign_err;
+
+-- INSTEAD OF row trigger
+TRUNCATE check_ign_err;
+TRUNCATE trig_test;
+CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err;
+CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view
+  FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of();
+COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+6	{6}	6
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM trig_test;
+DROP TRIGGER trig_instead_of ON check_ign_err_view;
+DROP VIEW check_ign_err_view;
+
+-- foreign table case is in postgres_fdw extension
+
+-- volatile function in WHERE clause
+TRUNCATE check_ign_err;
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS
+  WHERE n = floor(random()*(1-1+1))+1; /* finds values equal 1 */
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+6	{6}	6
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM check_ign_err;
+DROP TABLE check_ign_err;
+
+-- CIM_MULTI_CONDITIONAL case
+-- INSERT triggers for partition tables
+TRUNCATE trig_test;
+CREATE TABLE check_ign_err (n int check (n != 6), m int[], k int)
+  PARTITION BY RANGE (k);
+CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err
+  FOR VALUES FROM (1) TO (4);
+CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err
+  FOR VALUES FROM (4) TO (9);
+CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err
+  FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+6	{6}	6
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM check_ign_err;
+DROP TRIGGER trig_before_part on check_ign_err;
+DROP TABLE trig_test;
+DROP TABLE check_ign_err CASCADE;
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index c25b52d0cb..22c992e6f6 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -34,6 +34,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
 
     FORMAT <replaceable class="parameter">format_name</replaceable>
     FREEZE [ <replaceable class="parameter">boolean</replaceable> ]
+    IGNORE_ERRORS [ <replaceable class="parameter">boolean</replaceable> ]
     DELIMITER '<replaceable class="parameter">delimiter_character</replaceable>'
     NULL '<replaceable class="parameter">null_string</replaceable>'
     HEADER [ <replaceable class="parameter">boolean</replaceable> | MATCH ]
@@ -233,6 +234,19 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>IGNORE_ERRORS</literal></term>
+    <listitem>
+     <para>
+      Drops rows that contain malformed data while copying. These are rows
+      containing syntax errors in data, rows with too many or too few columns,
+      rows containing columns where the data type's input function raises an error.
+      Outputs warnings about rows with incorrect data (the number of warnings
+      is not more than 100) and the total number of errors.
+     </para>
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>DELIMITER</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 49924e476a..f41b25f26a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -406,6 +406,7 @@ ProcessCopyOptions(ParseState *pstate,
 				   bool is_from,
 				   List *options)
 {
+	bool		ignore_errors_specified = false;
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
@@ -448,6 +449,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "ignore_errors") == 0)
+		{
+			if (ignore_errors_specified)
+				errorConflictingDefElem(defel, pstate);
+			ignore_errors_specified = true;
+			opts_out->ignore_errors = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index e8bb168aea..39f1dca084 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -535,6 +535,7 @@ CopyFrom(CopyFromState cstate)
 	ExprContext *econtext;
 	TupleTableSlot *singleslot = NULL;
 	MemoryContext oldcontext = CurrentMemoryContext;
+	ResourceOwner oldowner = CurrentResourceOwner;
 
 	PartitionTupleRouting *proute = NULL;
 	ErrorContextCallback errcallback;
@@ -819,6 +820,26 @@ CopyFrom(CopyFromState cstate)
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
+	/* Initialize safeCopyFromState for IGNORE_ERRORS option*/
+	if (cstate->opts.ignore_errors)
+	{
+		cstate->sfcstate = palloc(sizeof(SafeCopyFromState));
+
+		cstate->sfcstate->replay_cxt = AllocSetContextCreate(oldcontext,
+									   "Replay_context",
+									   ALLOCSET_DEFAULT_SIZES);
+		cstate->sfcstate->replay_buffer = MemoryContextAlloc(cstate->sfcstate->replay_cxt,
+										  REPLAY_BUFFER_SIZE * sizeof(HeapTuple));
+		cstate->sfcstate->saved_tuples = 0;
+		cstate->sfcstate->replayed_tuples = 0;
+		cstate->sfcstate->errors = 0;
+		cstate->sfcstate->replay_is_active = false;
+		cstate->sfcstate->begin_subxact = true;
+
+		cstate->sfcstate->oldowner = oldowner;
+		cstate->sfcstate->oldcontext = oldcontext;
+	}
+
 	for (;;)
 	{
 		TupleTableSlot *myslot;
@@ -855,9 +876,18 @@ CopyFrom(CopyFromState cstate)
 
 		ExecClearTuple(myslot);
 
-		/* Directly store the values/nulls array in the slot */
-		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
-			break;
+		if (cstate->sfcstate)
+		{
+			/* If option IGNORE_ERRORS is enabled, COPY skips rows with errors */
+			if (!safeNextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+				break;
+			if (!cstate->sfcstate->replay_is_active)
+				continue;
+		}
+		else
+			/* Directly store the values/nulls array in the slot */
+			if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
+				break;
 
 		ExecStoreVirtualTuple(myslot);
 
@@ -1035,7 +1065,21 @@ CopyFrom(CopyFromState cstate)
 				 */
 				if (resultRelInfo->ri_FdwRoutine == NULL &&
 					resultRelInfo->ri_RelationDesc->rd_att->constr)
-					ExecConstraints(resultRelInfo, myslot, estate);
+				{
+					if (cstate->opts.ignore_errors)
+					{
+						PG_TRY();
+							ExecConstraints(resultRelInfo, myslot, estate);
+						PG_CATCH();
+							RollbackAndReleaseCurrentSubTransaction();
+							CurrentResourceOwner = cstate->sfcstate->oldowner;
+
+							PG_RE_THROW();
+						PG_END_TRY();
+					}
+					else
+						ExecConstraints(resultRelInfo, myslot, estate);
+				}
 
 				/*
 				 * Also check the tuple against the partition constraint, if
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index 7cf3e865cf..6b92e781e3 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -839,6 +839,169 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
 	return true;
 }
 
+/*
+ * Analog of NextCopyFrom() but ignores rows with errors while copying.
+ */
+bool
+safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext, Datum *values, bool *nulls)
+{
+	SafeCopyFromState *sfcstate = cstate->sfcstate;
+
+	PG_TRY();
+	{
+		if (sfcstate->begin_subxact)
+		{
+			BeginInternalSubTransaction(NULL);
+			CurrentResourceOwner = sfcstate->oldowner;
+
+			sfcstate->begin_subxact = false;
+		}
+
+		if (!sfcstate->replay_is_active)
+		{
+			if (sfcstate->saved_tuples < REPLAY_BUFFER_SIZE)
+			{
+				MemoryContext cxt = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
+				bool valid_row = NextCopyFrom(cstate, econtext, values, nulls);
+
+				CurrentMemoryContext = cxt;
+
+				if (valid_row)
+				{
+					/* Filling replay_buffer in Replay_context */
+					MemoryContext cxt = MemoryContextSwitchTo(sfcstate->replay_cxt);
+					HeapTuple saved_tuple = heap_form_tuple(RelationGetDescr(cstate->rel), values, nulls);
+
+					sfcstate->replay_buffer[sfcstate->saved_tuples++] = saved_tuple;
+					MemoryContextSwitchTo(cxt);
+				}
+				else if (sfcstate->replayed_tuples < sfcstate->saved_tuples)
+					/* Prepare for replaying remaining tuples if they exist */
+					sfcstate->replay_is_active = true;
+				else
+				{
+					ReleaseCurrentSubTransaction();
+					CurrentResourceOwner = sfcstate->oldowner;
+
+					if (sfcstate->errors == 0)
+						ereport(NOTICE,
+								errmsg("%d errors", sfcstate->errors));
+					else if (sfcstate->errors == 1)
+						ereport(WARNING,
+								errmsg("%d error", sfcstate->errors));
+					else
+						ereport(WARNING,
+								errmsg("%d errors", sfcstate->errors));
+
+					return false;
+				}
+			}
+			else
+				/* Buffer was filled, prepare for replaying */
+				sfcstate->replay_is_active = true;
+		}
+
+		if (sfcstate->replay_is_active)
+		{
+			if (sfcstate->replayed_tuples < sfcstate->saved_tuples)
+			{
+				/* Replaying the tuple */
+				MemoryContext cxt = MemoryContextSwitchTo(sfcstate->replay_cxt);
+
+				heap_deform_tuple(sfcstate->replay_buffer[sfcstate->replayed_tuples++], RelationGetDescr(cstate->rel), values, nulls);
+				MemoryContextSwitchTo(cxt);
+			}
+			else
+			{
+				/* Clean up replay_buffer */
+				MemoryContextReset(sfcstate->replay_cxt);
+				sfcstate->saved_tuples = sfcstate->replayed_tuples = 0;
+
+				cstate->sfcstate->replay_buffer = MemoryContextAlloc(cstate->sfcstate->replay_cxt,
+												  REPLAY_BUFFER_SIZE * sizeof(HeapTuple));
+
+				ReleaseCurrentSubTransaction();
+				CurrentResourceOwner = sfcstate->oldowner;
+
+				sfcstate->begin_subxact = true;
+				sfcstate->replay_is_active = false;
+			}
+		}
+	}
+	PG_CATCH();
+	{
+		MemoryContext ecxt = MemoryContextSwitchTo(sfcstate->oldcontext);
+		ErrorData *errdata = CopyErrorData();
+
+		RollbackAndReleaseCurrentSubTransaction();
+		CurrentResourceOwner = sfcstate->oldowner;
+
+		switch (errdata->sqlerrcode)
+		{
+			/* Ignore data exceptions */
+			case ERRCODE_CHARACTER_NOT_IN_REPERTOIRE:
+			case ERRCODE_DATA_EXCEPTION:
+			case ERRCODE_ARRAY_ELEMENT_ERROR:
+			case ERRCODE_DATETIME_VALUE_OUT_OF_RANGE:
+			case ERRCODE_INTERVAL_FIELD_OVERFLOW:
+			case ERRCODE_INVALID_CHARACTER_VALUE_FOR_CAST:
+			case ERRCODE_INVALID_DATETIME_FORMAT:
+			case ERRCODE_INVALID_ESCAPE_CHARACTER:
+			case ERRCODE_INVALID_ESCAPE_SEQUENCE:
+			case ERRCODE_NONSTANDARD_USE_OF_ESCAPE_CHARACTER:
+			case ERRCODE_INVALID_PARAMETER_VALUE:
+			case ERRCODE_INVALID_TABLESAMPLE_ARGUMENT:
+			case ERRCODE_INVALID_TIME_ZONE_DISPLACEMENT_VALUE:
+			case ERRCODE_NULL_VALUE_NOT_ALLOWED:
+			case ERRCODE_NUMERIC_VALUE_OUT_OF_RANGE:
+			case ERRCODE_SEQUENCE_GENERATOR_LIMIT_EXCEEDED:
+			case ERRCODE_STRING_DATA_LENGTH_MISMATCH:
+			case ERRCODE_STRING_DATA_RIGHT_TRUNCATION:
+			case ERRCODE_INVALID_TEXT_REPRESENTATION:
+			case ERRCODE_INVALID_BINARY_REPRESENTATION:
+			case ERRCODE_BAD_COPY_FILE_FORMAT:
+			case ERRCODE_UNTRANSLATABLE_CHARACTER:
+			case ERRCODE_DUPLICATE_JSON_OBJECT_KEY_VALUE:
+			case ERRCODE_INVALID_ARGUMENT_FOR_SQL_JSON_DATETIME_FUNCTION:
+			case ERRCODE_INVALID_JSON_TEXT:
+			case ERRCODE_INVALID_SQL_JSON_SUBSCRIPT:
+			case ERRCODE_MORE_THAN_ONE_SQL_JSON_ITEM:
+			case ERRCODE_NO_SQL_JSON_ITEM:
+			case ERRCODE_NON_NUMERIC_SQL_JSON_ITEM:
+			case ERRCODE_NON_UNIQUE_KEYS_IN_A_JSON_OBJECT:
+			case ERRCODE_SINGLETON_SQL_JSON_ITEM_REQUIRED:
+			case ERRCODE_SQL_JSON_ARRAY_NOT_FOUND:
+			case ERRCODE_SQL_JSON_MEMBER_NOT_FOUND:
+			case ERRCODE_SQL_JSON_NUMBER_NOT_FOUND:
+			case ERRCODE_SQL_JSON_OBJECT_NOT_FOUND:
+			case ERRCODE_TOO_MANY_JSON_ARRAY_ELEMENTS:
+			case ERRCODE_TOO_MANY_JSON_OBJECT_MEMBERS:
+			case ERRCODE_SQL_JSON_SCALAR_REQUIRED:
+			case ERRCODE_SQL_JSON_ITEM_CANNOT_BE_CAST_TO_TARGET_TYPE:
+				sfcstate->errors++;
+				if (sfcstate->errors <= 100)
+					ereport(WARNING,
+							(errcode(errdata->sqlerrcode),
+							errmsg("%s", errdata->context)));
+
+				sfcstate->begin_subxact = true;
+
+				break;
+			default:
+				PG_RE_THROW();
+		}
+
+		FlushErrorState();
+		FreeErrorData(errdata);
+		errdata = NULL;
+
+		MemoryContextSwitchTo(ecxt);
+	}
+	PG_END_TRY();
+
+	return true;
+}
+
 /*
  * Read next tuple from file for COPY FROM. Return false if no more tuples.
  *
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b5ab9d9c9a..b49954c0aa 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -808,7 +808,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	HANDLER HAVING HEADER_P HOLD HOUR_P
 
-	IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE
+	IDENTITY_P IF_P IGNORE_ERRORS ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IMPORT_P IN_P INCLUDE
 	INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
 	INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
 	INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
@@ -3477,6 +3477,10 @@ copy_opt_item:
 				{
 					$$ = makeDefElem("freeze", (Node *) makeBoolean(true), @1);
 				}
+			| IGNORE_ERRORS
+				{
+					$$ = makeDefElem("ignore_errors", (Node *)makeBoolean(true), @1);
+				}
 			| DELIMITER opt_as Sconst
 				{
 					$$ = makeDefElem("delimiter", (Node *) makeString($3), @1);
@@ -17778,6 +17782,7 @@ unreserved_keyword:
 			| HOUR_P
 			| IDENTITY_P
 			| IF_P
+			| IGNORE_ERRORS
 			| IMMEDIATE
 			| IMMUTABLE
 			| IMPLICIT_P
@@ -18357,6 +18362,7 @@ bare_label_keyword:
 			| HOLD
 			| IDENTITY_P
 			| IF_P
+			| IGNORE_ERRORS
 			| ILIKE
 			| IMMEDIATE
 			| IMMUTABLE
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 62a39779b9..fe590ff7a8 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2748,7 +2748,8 @@ psql_completion(const char *text, int start, int end)
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "("))
 		COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL",
 					  "HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE",
-					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING");
+					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING",
+					  "IGNORE_ERRORS");
 
 	/* Complete COPY <sth> FROM|TO filename WITH (FORMAT */
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(", "FORMAT"))
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index cb0096aeb6..006e1024e1 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -42,6 +42,7 @@ typedef struct CopyFormatOptions
 								 * -1 if not specified */
 	bool		binary;			/* binary format? */
 	bool		freeze;			/* freeze rows on loading? */
+	bool		ignore_errors;  /* ignore rows with errors */
 	bool		csv_mode;		/* Comma Separated Value format? */
 	CopyHeaderChoice header_line;	/* header line? */
 	char	   *null_print;		/* NULL marker string (server encoding!) */
@@ -76,6 +77,8 @@ extern CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *where
 								   const char *filename,
 								   bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options);
 extern void EndCopyFrom(CopyFromState cstate);
+extern bool safeNextCopyFrom(CopyFromState cstate, ExprContext *econtext,
+							 Datum *values, bool *nulls);
 extern bool NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 						 Datum *values, bool *nulls);
 extern bool NextCopyFromRawFields(CopyFromState cstate,
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index e37c6032ae..9100d5f247 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -16,6 +16,7 @@
 
 #include "commands/copy.h"
 #include "commands/trigger.h"
+#include "utils/resowner.h"
 
 /*
  * Represents the different source cases we need to worry about at
@@ -49,6 +50,24 @@ typedef enum CopyInsertMethod
 	CIM_MULTI_CONDITIONAL		/* use table_multi_insert only if valid */
 } CopyInsertMethod;
 
+/*
+ * Struct that holding fields for ignore_errors option
+ */
+typedef struct SafeCopyFromState
+{
+#define			REPLAY_BUFFER_SIZE 1000
+	HeapTuple	   *replay_buffer; 			/* accumulates tuples for replaying it after an error */
+	int				saved_tuples;			/* # of tuples in replay_buffer */
+	int 			replayed_tuples;		/* # of tuples was replayed from buffer */
+	int				errors;					/* total # of errors */
+	bool			replay_is_active;		/* if true we replay tuples from buffer */
+	bool			begin_subxact;			/* if true we can begin subtransaction */
+
+	MemoryContext	replay_cxt;
+	MemoryContext	oldcontext;
+	ResourceOwner	oldowner;
+} SafeCopyFromState;
+
 /*
  * This struct contains all the state variables used throughout a COPY FROM
  * operation.
@@ -71,6 +90,7 @@ typedef struct CopyFromStateData
 	char	   *filename;		/* filename, or NULL for STDIN */
 	bool		is_program;		/* is 'filename' a program to popen? */
 	copy_data_source_cb data_source_cb; /* function for reading data */
+	SafeCopyFromState *sfcstate; /* struct for ignore_errors option */
 
 	CopyFormatOptions opts;
 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index ae35f03251..2af11bd359 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -201,6 +201,7 @@ PG_KEYWORD("hold", HOLD, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("hour", HOUR_P, UNRESERVED_KEYWORD, AS_LABEL)
 PG_KEYWORD("identity", IDENTITY_P, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("if", IF_P, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("ignore_errors", IGNORE_ERRORS, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("ilike", ILIKE, TYPE_FUNC_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("immediate", IMMEDIATE, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("immutable", IMMUTABLE, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 5f3685e9ef..acf4917e64 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -649,6 +649,135 @@ SELECT * FROM instead_of_insert_tbl;
 (2 rows)
 
 COMMIT;
+-- tests for IGNORE_ERRORS option
+-- CIM_MULTI case
+CREATE TABLE check_ign_err (n int, m int[], k int);
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column k: "5555555555"
+WARNING:  COPY check_ign_err, line 6, column n: ""
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  6 errors
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+ 8 | {8} | 8
+(2 rows)
+
+-- CIM_SINGLE cases
+-- BEFORE row trigger
+TRUNCATE check_ign_err;
+CREATE TABLE trig_test(n int, m int[], k int);
+CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column k: "5555555555"
+WARNING:  COPY check_ign_err, line 6, column n: ""
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  6 errors
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+ 8 | {8} | 8
+(2 rows)
+
+DROP TRIGGER trig_before on check_ign_err;
+-- INSTEAD OF row trigger
+TRUNCATE check_ign_err;
+TRUNCATE trig_test;
+CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err;
+CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of();
+COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+WARNING:  COPY check_ign_err_view, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err_view, line 3: "3	{3}"
+WARNING:  COPY check_ign_err_view, line 4, column n: "a"
+WARNING:  COPY check_ign_err_view, line 5, column k: "5555555555"
+WARNING:  COPY check_ign_err_view, line 6, column n: ""
+WARNING:  COPY check_ign_err_view, line 7, column m: "{a, 7}"
+WARNING:  6 errors
+SELECT * FROM trig_test;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+ 8 | {8} | 8
+(2 rows)
+
+DROP TRIGGER trig_instead_of ON check_ign_err_view;
+DROP VIEW check_ign_err_view;
+-- foreign table case is in postgres_fdw extension
+-- volatile function in WHERE clause
+TRUNCATE check_ign_err;
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS
+  WHERE n = floor(random()*(1-1+1))+1; /* finds values equal 1 */
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column k: "5555555555"
+WARNING:  COPY check_ign_err, line 6, column n: ""
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  6 errors
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+(1 row)
+
+DROP TABLE check_ign_err;
+-- CIM_MULTI_CONDITIONAL case
+-- INSERT triggers for partition tables
+TRUNCATE trig_test;
+CREATE TABLE check_ign_err (n int, m int[], k int)
+  PARTITION BY RANGE (k);
+CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err
+  FOR VALUES FROM (1) TO (4);
+CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err
+  FOR VALUES FROM (4) TO (9);
+CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+WARNING:  COPY check_ign_err, line 2: "2	{2}	2	2"
+WARNING:  COPY check_ign_err, line 3: "3	{3}"
+WARNING:  COPY check_ign_err, line 4, column n: "a"
+WARNING:  COPY check_ign_err, line 5, column k: "5555555555"
+WARNING:  COPY check_ign_err, line 6, column n: ""
+WARNING:  COPY check_ign_err, line 7, column m: "{a, 7}"
+WARNING:  6 errors
+SELECT * FROM check_ign_err;
+ n |  m  | k 
+---+-----+---
+ 1 | {1} | 1
+ 8 | {8} | 8
+(2 rows)
+
+DROP TRIGGER trig_before_part on check_ign_err;
+DROP TABLE trig_test;
+DROP TABLE check_ign_err CASCADE;
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index b3c16af48e..b25b20182e 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -454,6 +454,122 @@ test1
 SELECT * FROM instead_of_insert_tbl;
 COMMIT;
 
+-- tests for IGNORE_ERRORS option
+-- CIM_MULTI case
+CREATE TABLE check_ign_err (n int, m int[], k int);
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM check_ign_err;
+
+-- CIM_SINGLE cases
+-- BEFORE row trigger
+TRUNCATE check_ign_err;
+CREATE TABLE trig_test(n int, m int[], k int);
+CREATE FUNCTION fn_trig_before () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM check_ign_err;
+DROP TRIGGER trig_before on check_ign_err;
+
+-- INSTEAD OF row trigger
+TRUNCATE check_ign_err;
+TRUNCATE trig_test;
+CREATE VIEW check_ign_err_view AS SELECT * FROM check_ign_err;
+CREATE FUNCTION fn_trig_instead_of () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m, NEW.k);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_instead_of INSTEAD OF INSERT ON check_ign_err_view
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_instead_of();
+COPY check_ign_err_view FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM trig_test;
+DROP TRIGGER trig_instead_of ON check_ign_err_view;
+DROP VIEW check_ign_err_view;
+
+-- foreign table case is in postgres_fdw extension
+
+-- volatile function in WHERE clause
+TRUNCATE check_ign_err;
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS
+  WHERE n = floor(random()*(1-1+1))+1; /* finds values equal 1 */
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM check_ign_err;
+DROP TABLE check_ign_err;
+
+-- CIM_MULTI_CONDITIONAL case
+-- INSERT triggers for partition tables
+TRUNCATE trig_test;
+CREATE TABLE check_ign_err (n int, m int[], k int)
+  PARTITION BY RANGE (k);
+CREATE TABLE check_ign_err_part1 PARTITION OF check_ign_err
+  FOR VALUES FROM (1) TO (4);
+CREATE TABLE check_ign_err_part2 PARTITION OF check_ign_err
+  FOR VALUES FROM (4) TO (9);
+CREATE FUNCTION fn_trig_before_part () RETURNS TRIGGER AS '
+  BEGIN
+    INSERT INTO trig_test VALUES(NEW.n, NEW.m);
+    RETURN NEW;
+  END;
+' LANGUAGE plpgsql;
+CREATE TRIGGER trig_before_part BEFORE INSERT ON check_ign_err
+FOR EACH ROW EXECUTE PROCEDURE fn_trig_before_part();
+COPY check_ign_err FROM STDIN WITH IGNORE_ERRORS WHERE n < 9;
+1	{1}	1
+2	{2}	2	2
+3	{3}
+a	{4}	4
+5	{5}	5555555555
+
+7	{a, 7}	7
+8	{8}	8
+\.
+SELECT * FROM check_ign_err;
+DROP TRIGGER trig_before_part on check_ign_err;
+DROP TABLE trig_test;
+DROP TABLE check_ign_err CASCADE;
+
 -- clean up
 DROP TABLE forcetest;
 DROP TABLE vistest;

Reply via email to