Hi,

> >     ERROR_LIMIT '<replaceable
> class="parameter">limit_number</replaceable>'
> >
> > I think this should be:
> >
> >     ERROR_LIMIT <replaceable class="parameter">limit_number</replaceable>
> >
> > (no single quote)
>
>
Thank you .Fixed

> More comments:
>
> - I think the document should stat that if limit_number = 0, all
>   errors are immediately raised (behaves same as current befavior without
> the patch).
>
>
if we want all error to be raised error limit_number  not need to be
specified.
but if it is specified like limit_number = 0 i think it is self-explanatory


> - "constraint violating rows will be returned back to the caller."
>   This does explains the current implementation. I am not sure if it's
>   intended or not though:
>
> cat /tmp/a
> 1       1
> 2       2
> 3       3
> 3       4
>
> psql test
> $ psql test
> psql (13devel)
> Type "help" for help.
>
> test=# select * from t1;
>  i | j
> ---+---
>  1 | 1
>  2 | 2
>  3 | 3
> (3 rows)
>
> test=# copy t1 from '/tmp/a' with (error_limit 1);
> ERROR:  duplicate key value violates unique constraint "t1_pkey"
> DETAIL:  Key (i)=(2) already exists.
> CONTEXT:  COPY t1, line 2: "2   2"
>
> So if the number of errors raised exceeds error_limit, no constaraint
> violating rows (in this case i=1, j=1) are returned.
>

error_limit is specified to dictate the number of error allowed in copy
operation
to precede. If it exceed the number the operation is stopped. there may
be more conflict afterward and returning limited number of conflicting rows
have no much use

regards
Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index a99f8155e4..c53e5f6d92 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] )
     FORCE_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] )
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
+    ERROR_LIMIT <replaceable class="parameter">limit_number</replaceable>
 </synopsis>
  </refsynopsisdiv>
 
@@ -355,6 +356,26 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>ERROR_LIMIT</literal></term>
+    <listitem>
+     <para>
+      Enables ignoring of errored out rows up to <replaceable
+      class="parameter">limit_number</replaceable>. If <replaceable
+      class="parameter">limit_number</replaceable> is set
+      to -1, then all errors will be ignored.
+     </para>
+
+     <para>
+      Currently, only unique or exclusion constraint violation
+      and rows formatting errors are ignored. Malformed
+      rows will rise warnings, while constraint violating rows
+      will be returned back to the caller.
+     </para>
+
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>WHERE</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 40a8ec1abd..72225a85a0 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -24,6 +24,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "access/printtup.h"
 #include "catalog/dependency.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_type.h"
@@ -48,7 +49,9 @@
 #include "port/pg_bswap.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
+#include "tcop/pquery.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -153,6 +156,7 @@ typedef struct CopyStateData
 	List	   *convert_select; /* list of column names (can be NIL) */
 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
 	Node	   *whereClause;	/* WHERE condition (or NULL) */
+	int			error_limit;	/* total number of error to ignore */
 
 	/* these are just for error messages, see CopyFromErrorCallback */
 	const char *cur_relname;	/* table name for error messages */
@@ -182,6 +186,9 @@ typedef struct CopyStateData
 	bool		volatile_defexprs;	/* is any of defexprs volatile? */
 	List	   *range_table;
 	ExprState  *qualexpr;
+	bool		ignore_error;	/* is ignore error specified? */
+	bool		ignore_all_error;	/* is error_limit -1 (ignore all error)
+									 * specified? */
 
 	TransitionCaptureState *transition_capture;
 
@@ -836,7 +843,7 @@ CopyLoadRawBuf(CopyState cstate)
 void
 DoCopy(ParseState *pstate, const CopyStmt *stmt,
 	   int stmt_location, int stmt_len,
-	   uint64 *processed)
+	   uint64 *processed, DestReceiver *dest)
 {
 	CopyState	cstate;
 	bool		is_from = stmt->is_from;
@@ -1068,7 +1075,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 		cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
 							   NULL, stmt->attlist, stmt->options);
 		cstate->whereClause = whereClause;
-		*processed = CopyFrom(cstate);	/* copy from file to database */
+		*processed = CopyFrom(cstate, dest);	/* copy from file to database */
 		EndCopyFrom(cstate);
 	}
 	else
@@ -1290,6 +1297,18 @@ ProcessCopyOptions(ParseState *pstate,
 								defel->defname),
 						 parser_errposition(pstate, defel->location)));
 		}
+		else if (strcmp(defel->defname, "error_limit") == 0)
+		{
+			if (cstate->ignore_error)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options"),
+						 parser_errposition(pstate, defel->location)));
+			cstate->error_limit = defGetInt64(defel);
+			cstate->ignore_error = true;
+			if (cstate->error_limit == -1)
+				cstate->ignore_all_error = true;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1440,6 +1459,10 @@ ProcessCopyOptions(ParseState *pstate,
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("CSV quote character must not appear in the NULL specification")));
+	if (cstate->ignore_error && !cstate->is_copy_from)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("ERROR LIMIT only available using COPY FROM")));
 }
 
 /*
@@ -2653,7 +2676,7 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
  * Copy FROM file to relation.
  */
 uint64
-CopyFrom(CopyState cstate)
+CopyFrom(CopyState cstate, DestReceiver *dest)
 {
 	ResultRelInfo *resultRelInfo;
 	ResultRelInfo *target_resultRelInfo;
@@ -2675,6 +2698,7 @@ CopyFrom(CopyState cstate)
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
+	Portal		portal = NULL;
 
 	Assert(cstate->rel);
 
@@ -2838,7 +2862,19 @@ CopyFrom(CopyState cstate)
 	/* Verify the named relation is a valid target for INSERT */
 	CheckValidResultRel(resultRelInfo, CMD_INSERT);
 
-	ExecOpenIndices(resultRelInfo, false);
+	if (cstate->ignore_error)
+	{
+		TupleDesc	tupDesc;
+
+		ExecOpenIndices(resultRelInfo, true);
+		tupDesc = RelationGetDescr(cstate->rel);
+
+		portal = GetPortalByName("");
+		SetRemoteDestReceiverParams(dest, portal);
+		dest->rStartup(dest, (int) CMD_SELECT, tupDesc);
+	}
+	else
+		ExecOpenIndices(resultRelInfo, false);
 
 	estate->es_result_relations = resultRelInfo;
 	estate->es_num_result_relations = 1;
@@ -2943,6 +2979,13 @@ CopyFrom(CopyState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
+	else if (cstate->ignore_error)
+	{
+		/*
+		 * Can't support speculative insertion in multi-inserts.
+		 */
+		insertMethod = CIM_SINGLE;
+	}
 	else
 	{
 		/*
@@ -3286,6 +3329,63 @@ CopyFrom(CopyState cstate)
 						 */
 						myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
+					else if ((cstate->error_limit > 0 || cstate->ignore_all_error) && resultRelInfo->ri_NumIndices > 0)
+					{
+						/* Perform a speculative insertion. */
+						uint32		specToken;
+						ItemPointerData conflictTid;
+						bool		specConflict;
+
+						/*
+						 * Do a non-conclusive check for conflicts first.
+						 */
+						specConflict = false;
+
+						if (!ExecCheckIndexConstraints(myslot, estate, &conflictTid,
+													   NIL))
+						{
+							(void) dest->receiveSlot(myslot, dest);
+							cstate->error_limit--;
+							continue;
+						}
+
+						/*
+						 * Acquire our speculative insertion lock.
+						 */
+						specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+
+						/* insert the tuple, with the speculative token */
+						table_tuple_insert_speculative(resultRelInfo->ri_RelationDesc, myslot,
+													   estate->es_output_cid,
+													   0,
+													   NULL,
+													   specToken);
+
+						/* insert index entries for tuple */
+						recheckIndexes = ExecInsertIndexTuples(myslot, estate, true,
+															   &specConflict,
+															   NIL);
+
+						/* adjust the tuple's state accordingly */
+						table_tuple_complete_speculative(resultRelInfo->ri_RelationDesc, myslot,
+														 specToken, !specConflict);
+
+						/*
+						 * Wake up anyone waiting for our decision.
+						 */
+						SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+						/*
+						 * If there was a conflict, return it and preceded to
+						 * the next record if there are any.
+						 */
+						if (specConflict)
+						{
+							(void) dest->receiveSlot(myslot, dest);
+							cstate->error_limit--;
+							continue;
+						}
+					}
 					else
 					{
 						/* OK, store the tuple and create index entries for it */
@@ -3703,7 +3803,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 	/* Initialize all values for row to NULL */
 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
 	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
 	if (!cstate->binary)
 	{
 		char	  **field_strings;
@@ -3718,9 +3818,21 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 
 		/* check for overflowing fields */
 		if (attr_count > 0 && fldct > attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("extra data after last expected column")));
+		{
+			if (cstate->error_limit > 0 || cstate->ignore_all_error)
+			{
+				ereport(WARNING,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("skipping \"%s\" --- extra data after last expected column",
+								cstate->line_buf.data)));
+				cstate->error_limit--;
+				goto next_line;
+			}
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("extra data after last expected column")));
+		}
 
 		fieldno = 0;
 
@@ -3732,10 +3844,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
 			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for column \"%s\"",
-								NameStr(att->attname))));
+			{
+				if (cstate->error_limit > 0 || cstate->ignore_all_error)
+				{
+					ereport(WARNING,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("skipping \"%s\" --- missing data for column \"%s\"",
+									cstate->line_buf.data, NameStr(att->attname))));
+					cstate->error_limit--;
+					goto next_line;
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("missing data for column \"%s\"",
+									NameStr(att->attname))));
+			}
 			string = field_strings[fieldno++];
 
 			if (cstate->convert_select_flags &&
@@ -3822,10 +3946,23 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 		}
 
 		if (fld_count != attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("row field count is %d, expected %d",
-							(int) fld_count, attr_count)));
+		{
+			if (cstate->error_limit > 0 || cstate->ignore_all_error)
+			{
+				ereport(WARNING,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("skipping \"%s\" --- row field count is %d, expected %d",
+								cstate->line_buf.data, (int) fld_count, attr_count)));
+				cstate->error_limit--;
+				goto next_line;
+			}
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("row field count is %d, expected %d",
+								(int) fld_count, attr_count)));
+
+		}
 
 		i = 0;
 		foreach(cur, cstate->attnumlist)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f8183cd488..817d0af002 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -784,7 +784,7 @@ copy_table(Relation rel)
 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
 
 	/* Do the copy */
-	(void) CopyFrom(cstate);
+	(void) CopyFrom(cstate, NULL);
 
 	logicalrep_rel_close(relmapentry, NoLock);
 }
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index bb85b5e52a..746a2a5160 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -728,7 +728,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 
 				DoCopy(pstate, (CopyStmt *) parsetree,
 					   pstmt->stmt_location, pstmt->stmt_len,
-					   &processed);
+					   &processed, dest);
 				if (completionTag)
 					snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
 							 "COPY " UINT64_FORMAT, processed);
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 67df0cd2c7..34869aaec6 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -892,6 +892,7 @@ ProcessResult(PGresult **results)
 {
 	bool		success = true;
 	bool		first_cycle = true;
+	bool		is_copy_in = false;
 
 	for (;;)
 	{
@@ -1015,6 +1016,7 @@ ProcessResult(PGresult **results)
 									   copystream,
 									   PQbinaryTuples(*results),
 									   &copy_result) && success;
+				is_copy_in = true;
 			}
 			ResetCancelConn();
 
@@ -1045,6 +1047,11 @@ ProcessResult(PGresult **results)
 		first_cycle = false;
 	}
 
+	/* Print returned result  for COPY FROM with error_limit. */
+	if (is_copy_in && !success && PQresultStatus(*results) !=
+		PGRES_FATAL_ERROR)
+		(void) PrintQueryTuples(*results);
+
 	SetResultVariables(*results, success);
 
 	/* may need this to recover from conn loss during COPY */
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833565..addd8054d6 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -25,7 +25,7 @@ typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 
 extern void DoCopy(ParseState *state, const CopyStmt *stmt,
 				   int stmt_location, int stmt_len,
-				   uint64 *processed);
+				   uint64 *processed, DestReceiver *dest);
 
 extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
 extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
@@ -37,7 +37,7 @@ extern bool NextCopyFromRawFields(CopyState cstate,
 								  char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
 
-extern uint64 CopyFrom(CopyState cstate);
+extern uint64 CopyFrom(CopyState cstate, DestReceiver *dest);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index e40287d25a..773e965970 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -55,6 +55,15 @@ LINE 1: COPY x TO stdout WHERE a = 1;
                          ^
 COPY x from stdin WHERE a = 50004;
 COPY x from stdin WHERE a > 60003;
+COPY x from stdin WITH(ERROR_LIMIT 5);
+WARNING:  skipping "70001	22	32" --- missing data for column "d"
+WARNING:  skipping "70002	23	33	43	53	54" --- extra data after last expected column
+WARNING:  skipping "70003	24	34	44" --- missing data for column "e"
+
+ a | b | c | d | e 
+---+---+---+---+---
+(0 rows)
+
 COPY x from stdin WHERE f > 60003;
 ERROR:  column "f" does not exist
 LINE 1: COPY x from stdin WHERE f > 60003;
@@ -102,12 +111,14 @@ SELECT * FROM x;
  50004 | 25 | 35         | 45     | before trigger fired
  60004 | 25 | 35         | 45     | before trigger fired
  60005 | 26 | 36         | 46     | before trigger fired
+ 70004 | 25 | 35         | 45     | before trigger fired
+ 70005 | 26 | 36         | 46     | before trigger fired
      1 |  1 | stuff      | test_1 | after trigger fired
      2 |  2 | stuff      | test_2 | after trigger fired
      3 |  3 | stuff      | test_3 | after trigger fired
      4 |  4 | stuff      | test_4 | after trigger fired
      5 |  5 | stuff      | test_5 | after trigger fired
-(28 rows)
+(30 rows)
 
 -- check copy out
 COPY x TO stdout;
@@ -134,6 +145,8 @@ COPY x TO stdout;
 50004	25	35	45	before trigger fired
 60004	25	35	45	before trigger fired
 60005	26	36	46	before trigger fired
+70004	25	35	45	before trigger fired
+70005	26	36	46	before trigger fired
 1	1	stuff	test_1	after trigger fired
 2	2	stuff	test_2	after trigger fired
 3	3	stuff	test_3	after trigger fired
@@ -163,6 +176,8 @@ Delimiter	before trigger fired
 35	before trigger fired
 35	before trigger fired
 36	before trigger fired
+35	before trigger fired
+36	before trigger fired
 stuff	after trigger fired
 stuff	after trigger fired
 stuff	after trigger fired
@@ -192,6 +207,8 @@ I'm null	before trigger fired
 25	before trigger fired
 25	before trigger fired
 26	before trigger fired
+25	before trigger fired
+26	before trigger fired
 1	after trigger fired
 2	after trigger fired
 3	after trigger fired
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index 902f4fac19..2378f428fc 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -110,6 +110,14 @@ COPY x from stdin WHERE a > 60003;
 60005	26	36	46	56
 \.
 
+COPY x from stdin WITH(ERROR_LIMIT 5);
+70001	22	32
+70002	23	33	43	53	54
+70003	24	34	44
+70004	25	35	45	55
+70005	26	36	46	56
+\.
+
 COPY x from stdin WHERE f > 60003;
 
 COPY x from stdin WHERE a = max(x.b);

Reply via email to