On Mon, Feb 17, 2020 at 10:00 AM Tatsuo Ishii <is...@sraoss.co.jp> wrote:

> >> test=# copy t1 from '/tmp/a' with (error_limit 1);
> >> ERROR:  duplicate key value violates unique constraint "t1_pkey"
> >> DETAIL:  Key (i)=(2) already exists.
> >> CONTEXT:  COPY t1, line 2: "2   2"
> >>
> >> So if the number of errors raised exceeds error_limit, no constaraint
> >> violating rows (in this case i=1, j=1) are returned.
> >>
> >
> > error_limit is specified to dictate the number of error allowed in copy
> > operation
> > to precede. If it exceed the number the operation is stopped. there may
> > be more conflict afterward and returning limited number of conflicting
> rows
> > have no much use
>
> Still I see your explanation differs from what the document patch says.
>
> +      Currently, only unique or exclusion constraint violation
> +      and rows formatting errors are ignored. Malformed
> +      rows will rise warnings, while constraint violating rows
> +      will be returned back to the caller.
>
> I am afraid once this patch is part of next version of PostgreSQL, we
> get many complains/inqueires from users. What about changing like this:
>
>       Currently, only unique or exclusion constraint violation and
>       rows formatting errors are ignored. Malformed rows will rise
>       warnings, while constraint violating rows will be returned back
>       to the caller unless any error is raised; i.e. if any error is
>       raised due to error_limit exceeds, no rows will be returned back
>       to the caller.
>

 Its better so amended .

regards
Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index a99f8155e4..845902b824 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] )
     FORCE_NULL ( <replaceable class="parameter">column_name</replaceable> [, ...] )
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
+    ERROR_LIMIT <replaceable class="parameter">limit_number</replaceable>
 </synopsis>
  </refsynopsisdiv>
 
@@ -355,6 +356,28 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>ERROR_LIMIT</literal></term>
+    <listitem>
+     <para>
+      Enables ignoring of errored out rows up to <replaceable
+      class="parameter">limit_number</replaceable>. If <replaceable
+      class="parameter">limit_number</replaceable> is set
+      to -1, then all errors will be ignored.
+     </para>
+
+     <para>
+      Currently, only unique or exclusion constraint violation
+      and rows formatting errors are ignored. Malformed
+      rows will rise warnings, while constraint violating rows
+      will be returned back to the caller unless any error is raised;
+      i.e. if any error is raised due to error_limit exceeds, no rows
+      will be returned back to the caller.
+     </para>
+
+    </listitem>
+   </varlistentry>
+
    <varlistentry>
     <term><literal>WHERE</literal></term>
     <listitem>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 40a8ec1abd..72225a85a0 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -24,6 +24,7 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "access/printtup.h"
 #include "catalog/dependency.h"
 #include "catalog/pg_authid.h"
 #include "catalog/pg_type.h"
@@ -48,7 +49,9 @@
 #include "port/pg_bswap.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
+#include "tcop/pquery.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
@@ -153,6 +156,7 @@ typedef struct CopyStateData
 	List	   *convert_select; /* list of column names (can be NIL) */
 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
 	Node	   *whereClause;	/* WHERE condition (or NULL) */
+	int			error_limit;	/* total number of error to ignore */
 
 	/* these are just for error messages, see CopyFromErrorCallback */
 	const char *cur_relname;	/* table name for error messages */
@@ -182,6 +186,9 @@ typedef struct CopyStateData
 	bool		volatile_defexprs;	/* is any of defexprs volatile? */
 	List	   *range_table;
 	ExprState  *qualexpr;
+	bool		ignore_error;	/* is ignore error specified? */
+	bool		ignore_all_error;	/* is error_limit -1 (ignore all error)
+									 * specified? */
 
 	TransitionCaptureState *transition_capture;
 
@@ -836,7 +843,7 @@ CopyLoadRawBuf(CopyState cstate)
 void
 DoCopy(ParseState *pstate, const CopyStmt *stmt,
 	   int stmt_location, int stmt_len,
-	   uint64 *processed)
+	   uint64 *processed, DestReceiver *dest)
 {
 	CopyState	cstate;
 	bool		is_from = stmt->is_from;
@@ -1068,7 +1075,7 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 		cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
 							   NULL, stmt->attlist, stmt->options);
 		cstate->whereClause = whereClause;
-		*processed = CopyFrom(cstate);	/* copy from file to database */
+		*processed = CopyFrom(cstate, dest);	/* copy from file to database */
 		EndCopyFrom(cstate);
 	}
 	else
@@ -1290,6 +1297,18 @@ ProcessCopyOptions(ParseState *pstate,
 								defel->defname),
 						 parser_errposition(pstate, defel->location)));
 		}
+		else if (strcmp(defel->defname, "error_limit") == 0)
+		{
+			if (cstate->ignore_error)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options"),
+						 parser_errposition(pstate, defel->location)));
+			cstate->error_limit = defGetInt64(defel);
+			cstate->ignore_error = true;
+			if (cstate->error_limit == -1)
+				cstate->ignore_all_error = true;
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1440,6 +1459,10 @@ ProcessCopyOptions(ParseState *pstate,
 		ereport(ERROR,
 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
 				 errmsg("CSV quote character must not appear in the NULL specification")));
+	if (cstate->ignore_error && !cstate->is_copy_from)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("ERROR LIMIT only available using COPY FROM")));
 }
 
 /*
@@ -2653,7 +2676,7 @@ CopyMultiInsertInfoStore(CopyMultiInsertInfo *miinfo, ResultRelInfo *rri,
  * Copy FROM file to relation.
  */
 uint64
-CopyFrom(CopyState cstate)
+CopyFrom(CopyState cstate, DestReceiver *dest)
 {
 	ResultRelInfo *resultRelInfo;
 	ResultRelInfo *target_resultRelInfo;
@@ -2675,6 +2698,7 @@ CopyFrom(CopyState cstate)
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
+	Portal		portal = NULL;
 
 	Assert(cstate->rel);
 
@@ -2838,7 +2862,19 @@ CopyFrom(CopyState cstate)
 	/* Verify the named relation is a valid target for INSERT */
 	CheckValidResultRel(resultRelInfo, CMD_INSERT);
 
-	ExecOpenIndices(resultRelInfo, false);
+	if (cstate->ignore_error)
+	{
+		TupleDesc	tupDesc;
+
+		ExecOpenIndices(resultRelInfo, true);
+		tupDesc = RelationGetDescr(cstate->rel);
+
+		portal = GetPortalByName("");
+		SetRemoteDestReceiverParams(dest, portal);
+		dest->rStartup(dest, (int) CMD_SELECT, tupDesc);
+	}
+	else
+		ExecOpenIndices(resultRelInfo, false);
 
 	estate->es_result_relations = resultRelInfo;
 	estate->es_num_result_relations = 1;
@@ -2943,6 +2979,13 @@ CopyFrom(CopyState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
+	else if (cstate->ignore_error)
+	{
+		/*
+		 * Can't support speculative insertion in multi-inserts.
+		 */
+		insertMethod = CIM_SINGLE;
+	}
 	else
 	{
 		/*
@@ -3286,6 +3329,63 @@ CopyFrom(CopyState cstate)
 						 */
 						myslot->tts_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
+					else if ((cstate->error_limit > 0 || cstate->ignore_all_error) && resultRelInfo->ri_NumIndices > 0)
+					{
+						/* Perform a speculative insertion. */
+						uint32		specToken;
+						ItemPointerData conflictTid;
+						bool		specConflict;
+
+						/*
+						 * Do a non-conclusive check for conflicts first.
+						 */
+						specConflict = false;
+
+						if (!ExecCheckIndexConstraints(myslot, estate, &conflictTid,
+													   NIL))
+						{
+							(void) dest->receiveSlot(myslot, dest);
+							cstate->error_limit--;
+							continue;
+						}
+
+						/*
+						 * Acquire our speculative insertion lock.
+						 */
+						specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+
+						/* insert the tuple, with the speculative token */
+						table_tuple_insert_speculative(resultRelInfo->ri_RelationDesc, myslot,
+													   estate->es_output_cid,
+													   0,
+													   NULL,
+													   specToken);
+
+						/* insert index entries for tuple */
+						recheckIndexes = ExecInsertIndexTuples(myslot, estate, true,
+															   &specConflict,
+															   NIL);
+
+						/* adjust the tuple's state accordingly */
+						table_tuple_complete_speculative(resultRelInfo->ri_RelationDesc, myslot,
+														 specToken, !specConflict);
+
+						/*
+						 * Wake up anyone waiting for our decision.
+						 */
+						SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+						/*
+						 * If there was a conflict, return it and preceded to
+						 * the next record if there are any.
+						 */
+						if (specConflict)
+						{
+							(void) dest->receiveSlot(myslot, dest);
+							cstate->error_limit--;
+							continue;
+						}
+					}
 					else
 					{
 						/* OK, store the tuple and create index entries for it */
@@ -3703,7 +3803,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 	/* Initialize all values for row to NULL */
 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
 	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
 	if (!cstate->binary)
 	{
 		char	  **field_strings;
@@ -3718,9 +3818,21 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 
 		/* check for overflowing fields */
 		if (attr_count > 0 && fldct > attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("extra data after last expected column")));
+		{
+			if (cstate->error_limit > 0 || cstate->ignore_all_error)
+			{
+				ereport(WARNING,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("skipping \"%s\" --- extra data after last expected column",
+								cstate->line_buf.data)));
+				cstate->error_limit--;
+				goto next_line;
+			}
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("extra data after last expected column")));
+		}
 
 		fieldno = 0;
 
@@ -3732,10 +3844,22 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
 			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for column \"%s\"",
-								NameStr(att->attname))));
+			{
+				if (cstate->error_limit > 0 || cstate->ignore_all_error)
+				{
+					ereport(WARNING,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("skipping \"%s\" --- missing data for column \"%s\"",
+									cstate->line_buf.data, NameStr(att->attname))));
+					cstate->error_limit--;
+					goto next_line;
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("missing data for column \"%s\"",
+									NameStr(att->attname))));
+			}
 			string = field_strings[fieldno++];
 
 			if (cstate->convert_select_flags &&
@@ -3822,10 +3946,23 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 		}
 
 		if (fld_count != attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("row field count is %d, expected %d",
-							(int) fld_count, attr_count)));
+		{
+			if (cstate->error_limit > 0 || cstate->ignore_all_error)
+			{
+				ereport(WARNING,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("skipping \"%s\" --- row field count is %d, expected %d",
+								cstate->line_buf.data, (int) fld_count, attr_count)));
+				cstate->error_limit--;
+				goto next_line;
+			}
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("row field count is %d, expected %d",
+								(int) fld_count, attr_count)));
+
+		}
 
 		i = 0;
 		foreach(cur, cstate->attnumlist)
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index f8183cd488..817d0af002 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -784,7 +784,7 @@ copy_table(Relation rel)
 	cstate = BeginCopyFrom(pstate, rel, NULL, false, copy_read_data, attnamelist, NIL);
 
 	/* Do the copy */
-	(void) CopyFrom(cstate);
+	(void) CopyFrom(cstate, NULL);
 
 	logicalrep_rel_close(relmapentry, NoLock);
 }
diff --git a/src/backend/tcop/utility.c b/src/backend/tcop/utility.c
index bb85b5e52a..746a2a5160 100644
--- a/src/backend/tcop/utility.c
+++ b/src/backend/tcop/utility.c
@@ -728,7 +728,7 @@ standard_ProcessUtility(PlannedStmt *pstmt,
 
 				DoCopy(pstate, (CopyStmt *) parsetree,
 					   pstmt->stmt_location, pstmt->stmt_len,
-					   &processed);
+					   &processed, dest);
 				if (completionTag)
 					snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
 							 "COPY " UINT64_FORMAT, processed);
diff --git a/src/bin/psql/common.c b/src/bin/psql/common.c
index 67df0cd2c7..34869aaec6 100644
--- a/src/bin/psql/common.c
+++ b/src/bin/psql/common.c
@@ -892,6 +892,7 @@ ProcessResult(PGresult **results)
 {
 	bool		success = true;
 	bool		first_cycle = true;
+	bool		is_copy_in = false;
 
 	for (;;)
 	{
@@ -1015,6 +1016,7 @@ ProcessResult(PGresult **results)
 									   copystream,
 									   PQbinaryTuples(*results),
 									   &copy_result) && success;
+				is_copy_in = true;
 			}
 			ResetCancelConn();
 
@@ -1045,6 +1047,11 @@ ProcessResult(PGresult **results)
 		first_cycle = false;
 	}
 
+	/* Print returned result  for COPY FROM with error_limit. */
+	if (is_copy_in && !success && PQresultStatus(*results) !=
+		PGRES_FATAL_ERROR)
+		(void) PrintQueryTuples(*results);
+
 	SetResultVariables(*results, success);
 
 	/* may need this to recover from conn loss during COPY */
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c639833565..addd8054d6 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -25,7 +25,7 @@ typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 
 extern void DoCopy(ParseState *state, const CopyStmt *stmt,
 				   int stmt_location, int stmt_len,
-				   uint64 *processed);
+				   uint64 *processed, DestReceiver *dest);
 
 extern void ProcessCopyOptions(ParseState *pstate, CopyState cstate, bool is_from, List *options);
 extern CopyState BeginCopyFrom(ParseState *pstate, Relation rel, const char *filename,
@@ -37,7 +37,7 @@ extern bool NextCopyFromRawFields(CopyState cstate,
 								  char ***fields, int *nfields);
 extern void CopyFromErrorCallback(void *arg);
 
-extern uint64 CopyFrom(CopyState cstate);
+extern uint64 CopyFrom(CopyState cstate, DestReceiver *dest);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index e40287d25a..773e965970 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -55,6 +55,15 @@ LINE 1: COPY x TO stdout WHERE a = 1;
                          ^
 COPY x from stdin WHERE a = 50004;
 COPY x from stdin WHERE a > 60003;
+COPY x from stdin WITH(ERROR_LIMIT 5);
+WARNING:  skipping "70001	22	32" --- missing data for column "d"
+WARNING:  skipping "70002	23	33	43	53	54" --- extra data after last expected column
+WARNING:  skipping "70003	24	34	44" --- missing data for column "e"
+
+ a | b | c | d | e 
+---+---+---+---+---
+(0 rows)
+
 COPY x from stdin WHERE f > 60003;
 ERROR:  column "f" does not exist
 LINE 1: COPY x from stdin WHERE f > 60003;
@@ -102,12 +111,14 @@ SELECT * FROM x;
  50004 | 25 | 35         | 45     | before trigger fired
  60004 | 25 | 35         | 45     | before trigger fired
  60005 | 26 | 36         | 46     | before trigger fired
+ 70004 | 25 | 35         | 45     | before trigger fired
+ 70005 | 26 | 36         | 46     | before trigger fired
      1 |  1 | stuff      | test_1 | after trigger fired
      2 |  2 | stuff      | test_2 | after trigger fired
      3 |  3 | stuff      | test_3 | after trigger fired
      4 |  4 | stuff      | test_4 | after trigger fired
      5 |  5 | stuff      | test_5 | after trigger fired
-(28 rows)
+(30 rows)
 
 -- check copy out
 COPY x TO stdout;
@@ -134,6 +145,8 @@ COPY x TO stdout;
 50004	25	35	45	before trigger fired
 60004	25	35	45	before trigger fired
 60005	26	36	46	before trigger fired
+70004	25	35	45	before trigger fired
+70005	26	36	46	before trigger fired
 1	1	stuff	test_1	after trigger fired
 2	2	stuff	test_2	after trigger fired
 3	3	stuff	test_3	after trigger fired
@@ -163,6 +176,8 @@ Delimiter	before trigger fired
 35	before trigger fired
 35	before trigger fired
 36	before trigger fired
+35	before trigger fired
+36	before trigger fired
 stuff	after trigger fired
 stuff	after trigger fired
 stuff	after trigger fired
@@ -192,6 +207,8 @@ I'm null	before trigger fired
 25	before trigger fired
 25	before trigger fired
 26	before trigger fired
+25	before trigger fired
+26	before trigger fired
 1	after trigger fired
 2	after trigger fired
 3	after trigger fired
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index 902f4fac19..2378f428fc 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -110,6 +110,14 @@ COPY x from stdin WHERE a > 60003;
 60005	26	36	46	56
 \.
 
+COPY x from stdin WITH(ERROR_LIMIT 5);
+70001	22	32
+70002	23	33	43	53	54
+70003	24	34	44
+70004	25	35	45	55
+70005	26	36	46	56
+\.
+
 COPY x from stdin WHERE f > 60003;
 
 COPY x from stdin WHERE a = max(x.b);

Reply via email to