On Mon, Feb 4, 2019 at 9:06 AM Michael Paquier <mich...@paquier.xyz> wrote:

> On Wed, Dec 19, 2018 at 02:48:14PM +0300, Surafel Temesgen wrote:
> > Thank you for informing, attach is rebased patch against current
> > master
>
> copy.c conflicts on HEAD, please rebase.  I am moving the patch to
> next CF, waiting on author.
> --
>

Thank you, here is a rebased patch against current master

regards
Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 254d3ab8eb..5ee70d62bf 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -380,6 +380,28 @@ WHERE <replaceable class="parameter">condition</replaceable>
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>on_conflict_log</literal></term>
+    <listitem>
+     <para>
+      Specifies to log error record up to specified amount.
+      Instead write the record to log file and
+      precede to the next record
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>log_file_name</literal></term>
+    <listitem>
+     <para>
+      The path name of the log file.  It must be an absolute
+      path.  Windows users might need to use an <literal>E''</literal> string and
+      double any backslashes used in the path name.
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect1>
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index dbb06397e6..3c6afec5b3 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -46,6 +46,7 @@
 #include "port/pg_bswap.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
+#include "storage/lmgr.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -123,6 +124,7 @@ typedef struct CopyStateData
 	int			file_encoding;	/* file or remote side's character encoding */
 	bool		need_transcoding;	/* file encoding diff from server? */
 	bool		encoding_embeds_ascii;	/* ASCII can be non-first byte? */
+	FILE	   *failed_rec_file;		/* used if ignore_conflict is true */
 
 	/* parameters from the COPY command */
 	Relation	rel;			/* relation to copy to or from */
@@ -152,6 +154,9 @@ typedef struct CopyStateData
 	List	   *convert_select; /* list of column names (can be NIL) */
 	bool	   *convert_select_flags;	/* per-column CSV/TEXT CS flags */
 	Node	   *whereClause;	/* WHERE condition (or NULL) */
+	char	   *failed_rec_filename;	/* failed record filename */
+	bool	   ignore_conflict;
+	int	   error_limit;			/* total # of error to log */
 
 	/* these are just for error messages, see CopyFromErrorCallback */
 	const char *cur_relname;	/* table name for error messages */
@@ -773,6 +778,21 @@ CopyLoadRawBuf(CopyState cstate)
 	return (inbytes > 0);
 }
 
+/*
+ * LogCopyError log error in to failed record file
+ */
+static void
+LogCopyError(CopyState cstate, const char *str)
+{
+	appendBinaryStringInfo(&cstate->line_buf, str, strlen(str));
+#ifndef WIN32
+	appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+	appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+	fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+	cstate->error_limit--;
+}
 
 /*
  *	 DoCopy executes the SQL COPY statement
@@ -1249,6 +1269,32 @@ ProcessCopyOptions(ParseState *pstate,
 								defel->defname),
 						 parser_errposition(pstate, defel->location)));
 		}
+		else if (strcmp(defel->defname, "on_conflict_log") == 0)
+		{
+			if (cstate->ignore_conflict)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options"),
+						 parser_errposition(pstate, defel->location)));
+
+			cstate->ignore_conflict = true;
+			cstate->error_limit =defGetInt64(defel);
+			if (cstate->error_limit < 0)
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("argument to option \"%s\" must be positive number",
+								defel->defname),
+						 parser_errposition(pstate, defel->location)));
+		}
+		else if (strcmp(defel->defname, "log_file_name") == 0)
+		{
+			if (cstate->failed_rec_filename)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options"),
+						 parser_errposition(pstate, defel->location)));
+			cstate->failed_rec_filename =defGetString(defel);
+		}
 		else
 			ereport(ERROR,
 					(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1271,6 +1317,21 @@ ProcessCopyOptions(ParseState *pstate,
 				(errcode(ERRCODE_SYNTAX_ERROR),
 				 errmsg("cannot specify NULL in BINARY mode")));
 
+	if (!cstate->error_limit && cstate->failed_rec_filename)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("cannot specify log file name without on conflict log option")));
+
+	if (cstate->error_limit && !cstate->failed_rec_filename)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("cannot specify on conflict log without log file name option")));
+
+	if (cstate->error_limit && !cstate->is_copy_from)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("cannot specify on conflict log on COPY TO")));
+
 	/* Set defaults for omitted options */
 	if (!cstate->delim)
 		cstate->delim = cstate->csv_mode ? "," : "\t";
@@ -1771,6 +1832,11 @@ EndCopy(CopyState cstate)
 					(errcode_for_file_access(),
 					 errmsg("could not close file \"%s\": %m",
 							cstate->filename)));
+		if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file))
+			ereport(ERROR,
+					(errcode_for_file_access(),
+					 errmsg("could not close file \"%s\": %m",
+							cstate->failed_rec_filename)));
 	}
 
 	MemoryContextDelete(cstate->copycontext);
@@ -2492,6 +2558,8 @@ CopyFrom(CopyState cstate)
 		hi_options |= HEAP_INSERT_FROZEN;
 	}
 
+	if (!cstate->ignore_conflict)
+		cstate->error_limit = 0;
 	/*
 	 * We need a ResultRelInfo so we can use the regular executor's
 	 * index-entry-making machinery.  (There used to be a huge amount of code
@@ -2619,6 +2687,10 @@ CopyFrom(CopyState cstate)
 		 */
 		insertMethod = CIM_SINGLE;
 	}
+	else if (cstate->ignore_conflict)
+	{
+		insertMethod = CIM_SINGLE;
+	}
 	else
 	{
 		/*
@@ -3000,12 +3072,59 @@ CopyFrom(CopyState cstate)
 						 */
 						tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
 					}
+					else if (cstate->ignore_conflict && cstate->error_limit > 0)
+					{
+						bool		specConflict;
+						uint32		specToken;
+						specConflict = false;
+
+						specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+						HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken);
+
+						/* insert the tuple, with the speculative token */
+						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
+									estate->es_output_cid,
+									HEAP_INSERT_SPECULATIVE,
+									NULL);
+
+						/* insert index entries for tuple */
+						recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+													   estate, true, &specConflict,
+													   NIL);
+
+						/* adjust the tuple's state accordingly */
+						if (!specConflict)
+						{
+							heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple);
+							processed++;
+						}
+						else
+						{
+							heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple);
+#ifndef WIN32
+							appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+							appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+							fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+							cstate->error_limit--;
+
+						}
+
+						/*
+						 * Wake up anyone waiting for our decision.  They will re-check
+						 * the tuple, see that it's no longer speculative, and wait on our
+						 * XID as if this was a regularly inserted tuple all along.
+						 */
+						SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+					}
 					else
 						heap_insert(resultRelInfo->ri_RelationDesc, tuple,
 									mycid, hi_options, bistate);
 
 					/* And create index entries for it */
-					if (resultRelInfo->ri_NumIndices > 0)
+					if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0)
 						recheckIndexes = ExecInsertIndexTuples(slot,
 															   &(tuple->t_self),
 															   estate,
@@ -3026,7 +3145,8 @@ CopyFrom(CopyState cstate)
 			 * or FDW; this is the same definition used by nodeModifyTable.c
 			 * for counting tuples inserted by an INSERT command.
 			 */
-			processed++;
+			if(!cstate->ignore_conflict)
+				processed++;
 		}
 	}
 
@@ -3316,6 +3436,48 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->num_defaults = num_defaults;
 	cstate->is_program = is_program;
 
+	if (cstate->failed_rec_filename)
+	{
+		mode_t		oumask; /* Pre-existing umask value */
+		struct stat st;
+			/*
+			 * Prevent write to relative path ... too easy to shoot oneself in
+			 * the foot by overwriting a database file ...
+			 */
+			if (!is_absolute_path(cstate->failed_rec_filename))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_NAME),
+						 errmsg("relative path not allowed for failed record file")));
+			oumask = umask(S_IWGRP | S_IWOTH);
+			PG_TRY();
+			{
+				cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W);
+			}
+			PG_CATCH();
+			{
+				umask(oumask);
+				PG_RE_THROW();
+			}
+			PG_END_TRY();
+			umask(oumask);
+			if (cstate->failed_rec_file == NULL)
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not open file \"%s\" for writing: %m",
+								cstate->failed_rec_filename)));
+
+			if (fstat(fileno(cstate->failed_rec_file), &st))
+				ereport(ERROR,
+						(errcode_for_file_access(),
+						 errmsg("could not stat file \"%s\": %m",
+								cstate->failed_rec_filename)));
+
+			if (S_ISDIR(st.st_mode))
+				ereport(ERROR,
+						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						 errmsg("\"%s\" is a directory", cstate->failed_rec_filename)));
+		}
+
 	if (data_source_cb)
 	{
 		cstate->copy_dest = COPY_CALLBACK;
@@ -3514,7 +3676,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 	/* Initialize all values for row to NULL */
 	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
 	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
 	if (!cstate->binary)
 	{
 		char	  **field_strings;
@@ -3529,9 +3691,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 
 		/* check for overflowing fields */
 		if (attr_count > 0 && fldct > attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("extra data after last expected column")));
+		{
+			if (cstate->ignore_conflict && cstate->error_limit > 0)
+			{
+				LogCopyError(cstate, " extra data after last expected column");
+				goto next_line;
+			}else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("extra data after last expected column")));
+		}
 
 		fieldno = 0;
 
@@ -3543,10 +3712,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
 			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for column \"%s\"",
-								NameStr(att->attname))));
+			{
+				if (cstate->ignore_conflict && cstate->error_limit > 0)
+				{
+					appendStringInfo(&cstate->line_buf, " missing data for column %s",
+								NameStr(att->attname));
+					LogCopyError(cstate, " ");
+					goto next_line;
+				}else
+
+					ereport(ERROR,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("missing data for column \"%s\"",
+									NameStr(att->attname))));
+			}
 			string = field_strings[fieldno++];
 
 			if (cstate->convert_select_flags &&
@@ -3633,10 +3812,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 		}
 
 		if (fld_count != attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("row field count is %d, expected %d",
-							(int) fld_count, attr_count)));
+		{
+			if (cstate->ignore_conflict && cstate->error_limit > 0)
+			{
+				appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d",
+						(int) fld_count, attr_count);
+				LogCopyError(cstate, " ");
+				goto next_line;
+			}else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						 errmsg("row field count is %d, expected %d",
+								(int) fld_count, attr_count)));
+		}
 
 		i = 0;
 		foreach(cur, cstate->attnumlist)
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index a68f78e0e0..74d5737d7a 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -631,7 +631,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
 	EXTENSION EXTERNAL EXTRACT
 
-	FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+	FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR
 	FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
 
 	GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -649,7 +649,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 	LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
 	LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
-	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+	LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED
 
 	MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
 
@@ -3047,6 +3047,14 @@ copy_opt_item:
 				{
 					$$ = makeDefElem("encoding", (Node *)makeString($2), @1);
 				}
+			| ON CONFLICT LOG_P Iconst
+				{
+					$$ = makeDefElem("on_conflict_log", (Node *)makeInteger($4), @1);
+				}
+			| LOG_P FILE_P NAME_P Sconst
+				{
+					$$ = makeDefElem("log_file_name", (Node *)makeString($4), @1);
+				}
 		;
 
 /* The following exist for backward compatibility with very old versions */
@@ -15033,6 +15041,7 @@ unreserved_keyword:
 			| EXTENSION
 			| EXTERNAL
 			| FAMILY
+			| FILE_P
 			| FILTER
 			| FIRST_P
 			| FOLLOWING
@@ -15081,6 +15090,7 @@ unreserved_keyword:
 			| LOCATION
 			| LOCK_P
 			| LOCKED
+			| LOG_P
 			| LOGGED
 			| MAPPING
 			| MATCH
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index f05444008c..c161b4cd7a 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -161,6 +161,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
 PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
 PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
 PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
+PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
 PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD)
@@ -241,6 +242,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD)
 PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
 PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
+PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD)
 PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
 PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
 PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)

Reply via email to