On Wed, Dec 20, 2023 at 8:27 PM Masahiko Sawada <sawada.m...@gmail.com> wrote:
>
>
> Why do we need to use SPI? I think we can form heap tuples and insert
> them to the error table. Creating the error table also doesn't need to
> use SPI.
>
Thanks for pointing it out. I figured out how to form heap tuples and
insert them to the error table.
but I don't know how to create the error table without using SPI.
Please pointer it out.

> >
> > copy_errors one per schema.
> > foo.copy_errors will be owned by the schema: foo owner.
>
> It seems that the error table is created when the SAVE_ERROR is used
> for the first time. It probably blocks concurrent COPY FROM commands
> with SAVE_ERROR option to different tables if the error table is not
> created yet.
>
I don't know how to solve this problem.... Maybe we can document this.
but it will block the COPY FROM immediately.

> >
> > if you can insert to a table in that specific schema let's say foo,
> > then you will get privilege to INSERT/DELETE/SELECT
> > to foo.copy_errors.
> > If you are not a superuser, you are only allowed to do
> > INSERT/DELETE/SELECT on foo.copy_errors rows where USERID =
> > current_user::regrole::oid.
> > This is done via row level security.
>
> I don't think it works. If the user is dropped, the user's oid could
> be reused for a different user.
>

You are right.
so I changed, now the schema owner will be the error table owner.
every error table tuple inserts,
I switch to schema owner, do the insert, then switch back to the
COPY_FROM operation user.
now everyone (except superuser) will need explicit grant to access the
error table.
From 8c8c266f1dc809ffa0ec9f4262bdd912ed6b758a Mon Sep 17 00:00:00 2001
From: pgaddict <jian.universal...@gmail.com>
Date: Wed, 27 Dec 2023 20:15:24 +0800
Subject: [PATCH v13 1/1] Make COPY FROM more error tolerant
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Currently COPY FROM has 3 types of error while processing the source file.
* extra data after last expected column
* missing data for column \"%s\"
* data type conversion error.

Instead of throwing errors while copying, save_error specifier will
save errors to table copy_errors for all the copy from operation in the same schema.

We check the existing copy_errors table definition by column name and column data type.
if table already exists and meets the criteria then errors metadata will save to copy_errors.
if the table does not exist, then create one.

table copy_errors is per schema-wise, it's owned by the copy from
operation destination schema's owner.
The table owner has full privilege on copy_errors,
other non-superuser need gain privilege to access it.

Only works for COPY FROM, non-BINARY mode.
---
 doc/src/sgml/ref/copy.sgml               | 121 ++++++++++++-
 src/backend/commands/copy.c              |  12 ++
 src/backend/commands/copyfrom.c          | 133 +++++++++++++-
 src/backend/commands/copyfromparse.c     | 217 +++++++++++++++++++++--
 src/backend/parser/gram.y                |   8 +-
 src/bin/psql/tab-complete.c              |   3 +-
 src/include/commands/copy.h              |   1 +
 src/include/commands/copyfrom_internal.h |   6 +
 src/include/parser/kwlist.h              |   1 +
 src/test/regress/expected/copy2.out      | 137 ++++++++++++++
 src/test/regress/sql/copy2.sql           | 123 +++++++++++++
 11 files changed, 746 insertions(+), 16 deletions(-)

diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 18ecc69c..1d0ff0b6 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -44,6 +44,7 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     FORCE_NOT_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     FORCE_NULL { ( <replaceable class="parameter">column_name</replaceable> [, ...] ) | * }
     ENCODING '<replaceable class="parameter">encoding_name</replaceable>'
+    SAVE_ERROR [ <replaceable class="parameter">boolean</replaceable> ]
 </synopsis>
  </refsynopsisdiv>
 
@@ -411,6 +412,18 @@ WHERE <replaceable class="parameter">condition</replaceable>
     </listitem>
    </varlistentry>
 
+   <varlistentry>
+    <term><literal>SAVE_ERROR</literal></term>
+    <listitem>
+     <para>
+      Specifies that any data conversion errors while copying will automatically saved in table <literal>COPY_ERRORS</literal> and the <command>COPY FROM</command> operation will not be interrupted by conversion errors.
+      This option is not allowed when using <literal>binary</literal> format. Note that this
+      is only supported in current <command>COPY FROM</command> syntax.
+      If this option is omitted, any data type conversion errors will be raised immediately.
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect1>
 
@@ -564,6 +577,7 @@ COPY <replaceable class="parameter">count</replaceable>
     amount to a considerable amount of wasted disk space if the failure
     happened well into a large copy operation. You might wish to invoke
     <command>VACUUM</command> to recover the wasted space.
+    To continue copying while skip conversion errors in a <command>COPY FROM</command>, you might wish to specify <literal>SAVE_ERROR</literal>.
    </para>
 
    <para>
@@ -572,6 +586,18 @@ COPY <replaceable class="parameter">count</replaceable>
     null strings to null values and unquoted null strings to empty strings.
    </para>
 
+   <para>
+    If the <literal>SAVE_ERROR</literal> option is specified and conversion errors occur while copying,
+    <productname>PostgreSQL</productname> will first check the table <literal>COPY_ERRORS</literal> existence, then save the conversion error related information to it.
+    If it does exist, but the actual table definition cannot use it to save the error information, an error is raised, <command>COPY FROM</command> operation stops.
+    If it does not exist, <productname>PostgreSQL</productname> will try to create it before doing the actual copy operation.
+    The table <literal>COPY_ERRORS</literal> owner is the current schema owner.
+    All the future errors related information generated while copying data to the same schema will automatically be saved to the same <literal>COPY_ERRORS</literal> table.
+    Currenly only the owner can read and write data to table <literal>COPY_ERRORS</literal>.
+    Conversion errors include data type conversion failure, extra data or missing data in the source file.
+    <literal>COPY_ERRORS</literal> table detailed description listed in <xref linkend="copy-errors-table"/>.
+
+   </para>
  </refsect1>
 
  <refsect1>
@@ -588,7 +614,7 @@ COPY <replaceable class="parameter">count</replaceable>
     output function, or acceptable to the input function, of each
     attribute's data type.  The specified null string is used in
     place of columns that are null.
-    <command>COPY FROM</command> will raise an error if any line of the
+    By default, if <literal>SAVE_ERROR</literal> not specified, <command>COPY FROM</command> will raise an error if any line of the
     input file contains more or fewer columns than are expected.
    </para>
 
@@ -962,6 +988,99 @@ versions of <productname>PostgreSQL</productname>.
      check against somehow getting out of sync with the data.
     </para>
    </refsect3>
+
+   <refsect3>
+    <title> TABLE COPY_ERRORS </title>
+    <para>
+        If <literal>SAVE_ERROR</literal> specified, all the data type conversion errors while copying will automatically saved in <literal>COPY_ERRORS</literal>
+        <xref linkend="copy-errors-table"/> shows <literal>COPY_ERRORS</literal>  table's column name, data type, and description.
+    </para>
+
+   <table id="copy-errors-table">
+    <title>Error Saving table description </title>
+
+    <tgroup cols="3">
+     <thead>
+      <row>
+       <entry>Column name</entry>
+       <entry>Data type</entry>
+       <entry>Description</entry>
+      </row>
+     </thead>
+
+      <tbody>
+       <row>
+       <entry> <literal>userid</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The user generated the conversion error.
+       Refer <link linkend="catalog-pg-authid"><structname>pg_authid</structname></link>.<structfield>oid</structfield>.
+       There is no hard depenedency with <literal>pg_authid</literal>, if correspond <structfield>oid</structfield> deleted in <literal>pg_authid</literal>, it becomes stale.
+    </entry>
+       </row>
+
+       <row>
+       <entry> <literal>copy_destination</literal> </entry>
+       <entry><type>oid</type></entry>
+       <entry>The <command>COPY FROM</command> operation destination table oid.
+        Refer <link linkend="catalog-pg-class"><structname>pg_class</structname></link>.<structfield>oid</structfield>.
+        There is no hard depenedency with <literal>pg_class</literal> if correspond <structfield>oid</structfield> deleted in <literal>pg_class</literal>, it becomes stale.
+
+        </entry>
+       </row>
+
+       <row>
+       <entry> <literal>filename</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The path name of the input filed</entry>
+       </row>
+
+       <row>
+       <entry> <literal>lineno</literal> </entry>
+       <entry><type>bigint</type></entry>
+       <entry>Line number where the error occurred, counting from 1</entry>
+       </row>
+
+       <row>
+       <entry> <literal>line</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred line</entry>
+       </row>
+
+       <row>
+       <entry> <literal>colname</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Field where the error occurred</entry>
+       </row>
+
+       <row>
+       <entry> <literal>raw_field_value</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Raw content of the error occurred field</entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_message </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error message text </entry>
+       </row>
+
+       <row>
+       <entry> <literal>err_detail</literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>Detailed error message </entry>
+       </row>
+
+       <row>
+       <entry> <literal>errorcode </literal> </entry>
+       <entry><type>text</type></entry>
+       <entry>The error code for the copying error</entry>
+       </row>
+
+      </tbody>
+     </tgroup>
+    </table>
+   </refsect3>
+
   </refsect2>
  </refsect1>
 
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b5..bc4af10a 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -419,6 +419,7 @@ ProcessCopyOptions(ParseState *pstate,
 	bool		format_specified = false;
 	bool		freeze_specified = false;
 	bool		header_specified = false;
+	bool		save_error_specified = false;
 	ListCell   *option;
 
 	/* Support external use for option sanity checking */
@@ -458,6 +459,13 @@ ProcessCopyOptions(ParseState *pstate,
 			freeze_specified = true;
 			opts_out->freeze = defGetBoolean(defel);
 		}
+		else if (strcmp(defel->defname, "save_error") == 0)
+		{
+			if (save_error_specified)
+				errorConflictingDefElem(defel, pstate);
+			save_error_specified = true;
+			opts_out->save_error = defGetBoolean(defel);
+		}
 		else if (strcmp(defel->defname, "delimiter") == 0)
 		{
 			if (opts_out->delim)
@@ -598,6 +606,10 @@ ProcessCopyOptions(ParseState *pstate,
 				(errcode(ERRCODE_SYNTAX_ERROR),
 				 errmsg("cannot specify DEFAULT in BINARY mode")));
 
+	if (opts_out->binary && opts_out->save_error)
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("cannot specify SAVE_ERROR in BINARY mode")));
 	/* Set defaults for omitted options */
 	if (!opts_out->delim)
 		opts_out->delim = opts_out->csv_mode ? "," : "\t";
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index f4861652..a972ad87 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -29,7 +29,9 @@
 #include "access/tableam.h"
 #include "access/xact.h"
 #include "access/xlog.h"
+#include "catalog/pg_authid.h"
 #include "catalog/namespace.h"
+#include "catalog/pg_namespace.h"
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
@@ -38,6 +40,7 @@
 #include "executor/executor.h"
 #include "executor/nodeModifyTable.h"
 #include "executor/tuptable.h"
+#include "executor/spi.h"
 #include "foreign/fdwapi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
@@ -52,6 +55,7 @@
 #include "utils/portal.h"
 #include "utils/rel.h"
 #include "utils/snapmgr.h"
+#include "utils/syscache.h"
 
 /*
  * No more than this many tuples per CopyMultiInsertBuffer
@@ -655,7 +659,8 @@ CopyFrom(CopyFromState cstate)
 
 	Assert(cstate->rel);
 	Assert(list_length(cstate->range_table) == 1);
-
+	if (cstate->opts.save_error)
+		Assert(cstate->escontext);
 	/*
 	 * The target must be a plain, foreign, or partitioned relation, or have
 	 * an INSTEAD OF INSERT row trigger.  (Currently, such triggers are only
@@ -992,6 +997,10 @@ CopyFrom(CopyFromState cstate)
 		if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull))
 			break;
 
+		/* Soft error occured, skip this tuple. */
+		if (cstate->opts.save_error && cstate->line_error_occured)
+			continue;
+
 		ExecStoreVirtualTuple(myslot);
 
 		/*
@@ -1297,6 +1306,20 @@ CopyFrom(CopyFromState cstate)
 
 	ExecResetTupleTable(estate->es_tupleTable, false);
 
+	if (cstate->opts.save_error)
+	{
+		Assert(cstate->copy_errors_nspname);
+
+		if (cstate->error_rows_cnt > 0)
+		{
+			ereport(NOTICE,
+					errmsg("%llu rows were skipped because of conversion error."
+							" Skipped rows saved to table %s.copy_errors",
+							(unsigned long long) cstate->error_rows_cnt,
+							cstate->copy_errors_nspname));
+		}
+	}
+
 	/* Allow the FDW to shut down */
 	if (target_resultRelInfo->ri_FdwRoutine != NULL &&
 		target_resultRelInfo->ri_FdwRoutine->EndForeignInsert != NULL)
@@ -1444,6 +1467,114 @@ BeginCopyFrom(ParseState *pstate,
 		}
 	}
 
+	/* Set up soft error handler for SAVE_ERROR */
+	if (cstate->opts.save_error)
+	{
+		StringInfoData 	querybuf;
+		bool		isnull;
+		bool		copy_erros_table_ok;
+		Oid			nsp_oid;
+		Oid			save_userid;
+		Oid			ownerId;
+		int			save_sec_context;
+		const char	*copy_errors_nspname;
+		HeapTuple	tuple;
+
+		cstate->escontext = makeNode(ErrorSaveContext);
+		cstate->escontext->type = T_ErrorSaveContext;
+		cstate->escontext->details_wanted = true;
+		cstate->escontext->error_occurred = false;
+
+		copy_errors_nspname = get_namespace_name(RelationGetNamespace(cstate->rel));
+		nsp_oid = get_namespace_oid(copy_errors_nspname, false);
+
+		initStringInfo(&querybuf);
+		/*
+		*
+		* Verify whether the nsp_oid.COPY_ERRORS table already exists, and if so,
+		* examine its column names and data types.
+		*/
+		appendStringInfo(&querybuf,
+						"SELECT (array_agg(pa.attname ORDER BY pa.attnum) "
+							"= '{ctid,userid,copy_destination,filename,lineno, "
+							"line,colname,raw_field_value,err_message,err_detail,errorcode}') "
+							"AND (ARRAY_AGG(pt.typname ORDER BY pa.attnum) "
+							"= '{tid,oid,oid,text,int8,text,text,text,text,text,text}') "
+							"FROM pg_catalog.pg_attribute pa "
+							"JOIN pg_catalog.pg_class	pc ON pc.oid = pa.attrelid "
+							"JOIN pg_catalog.pg_type 	pt ON pt.oid = pa.atttypid "
+							"JOIN pg_catalog.pg_namespace pn "
+							"ON pn.oid = pc.relnamespace WHERE ");
+		appendStringInfo(&querybuf,
+							"relname = $$copy_errors$$ AND pn.nspname = $$%s$$ "
+							" AND pa.attnum >= -1 AND NOT attisdropped ",
+							copy_errors_nspname);
+
+		if (SPI_connect() != SPI_OK_CONNECT)
+			elog(ERROR, "SPI_connect failed");
+
+		if (SPI_execute(querybuf.data, false, 0) != SPI_OK_SELECT)
+			elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+		copy_erros_table_ok = DatumGetBool(SPI_getbinval(SPI_tuptable->vals[0],
+									   SPI_tuptable->tupdesc,
+									   1, &isnull));
+
+		tuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(nsp_oid));
+		if (!HeapTupleIsValid(tuple))
+				ereport(ERROR,
+						(errcode(ERRCODE_UNDEFINED_SCHEMA),
+						 errmsg("schema with OID %u does not exist", nsp_oid)));
+		ownerId = ((Form_pg_namespace) GETSTRUCT(tuple))->nspowner;
+		ReleaseSysCache(tuple);
+
+		cstate->copy_errors_owner = ownerId;
+
+		/*
+		* Switch to the schema owner's userid, so that the COPY_ERRORS table owned by
+		* that user.
+		*/
+		GetUserIdAndSecContext(&save_userid, &save_sec_context);
+
+		SetUserIdAndSecContext(ownerId,
+							save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
+							SECURITY_NOFORCE_RLS);
+
+		/* No copy_errors_nspname.COPY_ERRORS table then create it for holding all the potential error. */
+		if (isnull)
+		{
+			resetStringInfo(&querybuf);
+			appendStringInfo(&querybuf,
+					"CREATE TABLE %s.COPY_ERRORS( "
+					"USERID OID, COPY_DESTINATION OID, FILENAME TEXT,LINENO BIGINT "
+					",LINE TEXT, COLNAME text, RAW_FIELD_VALUE TEXT "
+					",ERR_MESSAGE TEXT, ERR_DETAIL TEXT, ERRORCODE TEXT)", copy_errors_nspname);
+
+			if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
+				elog(ERROR, "SPI_exec failed: %s", querybuf.data);
+		}
+		else if(!copy_erros_table_ok)
+			ereport(ERROR,
+					(errmsg("table %s.COPY_ERRORS already exists. "
+								 "cannot use it for COPY FROM error saving",
+								  copy_errors_nspname)));
+
+		if (SPI_finish() != SPI_OK_FINISH)
+			elog(ERROR, "SPI_finish failed");
+
+		/* Restore userid and security context */
+		SetUserIdAndSecContext(save_userid, save_sec_context);
+		cstate->copy_errors_nspname = pstrdup(copy_errors_nspname);
+	}
+	else
+	{
+		cstate->copy_errors_nspname = NULL;
+		cstate->escontext = NULL;
+		cstate->copy_errors_owner = (Oid) 0;
+	}
+
+	cstate->error_rows_cnt = 0;  		/* set the default to 0 */
+	cstate->line_error_occured = false;	/* default, assume conversion be ok. */
+
 	/* Convert convert_selectively name list to per-column flags */
 	if (cstate->opts.convert_selectively)
 	{
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index f5537345..f0849725 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -58,18 +58,21 @@
  */
 #include "postgres.h"
 
+#include "access/heapam.h"
 #include <ctype.h>
 #include <unistd.h>
 #include <sys/stat.h>
-
+#include <catalog/namespace.h>
 #include "commands/copy.h"
 #include "commands/copyfrom_internal.h"
 #include "commands/progress.h"
 #include "executor/executor.h"
+#include "executor/spi.h"
 #include "libpq/libpq.h"
 #include "libpq/pqformat.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
+#include "nodes/miscnodes.h"
 #include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "utils/builtins.h"
@@ -880,16 +883,85 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 		int			fldct;
 		int			fieldno;
 		char	   *string;
+		char		*errmsg_extra;
+		Oid			save_userid = InvalidOid;
+		int			save_sec_context = -1;
+		HeapTuple	copy_errors_tup;
+		Relation	copy_errorsrel;
+		TupleDesc	copy_errors_tupDesc;
+		Datum		t_values[10];
+		bool		t_isnull[10];
 
 		/* read raw fields in the next line */
 		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
 			return false;
 
+		if (cstate->opts.save_error)
+		{
+			/*
+			* Open the copy_errors relation. we also need current userid for the later heap inserts.
+			*
+			*/
+			copy_errorsrel = table_open(RelnameGetRelid("copy_errors"), RowExclusiveLock);
+			copy_errors_tupDesc = copy_errorsrel->rd_att;
+			GetUserIdAndSecContext(&save_userid, &save_sec_context);
+		}
+
+		/* reset line_error_occured to false for next new line. */
+		if (cstate->line_error_occured)
+			cstate->line_error_occured = false;
+
 		/* check for overflowing fields */
 		if (attr_count > 0 && fldct > attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("extra data after last expected column")));
+		{
+			if(cstate->opts.save_error)
+			{
+				errmsg_extra = pstrdup("extra data after last expected column");
+				t_values[0] = ObjectIdGetDatum(save_userid);
+				t_isnull[0] = false;
+				t_values[1] = ObjectIdGetDatum(cstate->rel->rd_rel->oid);
+				t_isnull[1] = false;
+				t_values[2] = CStringGetTextDatum(
+								cstate->filename ? cstate->filename : "STDIN");
+				t_isnull[2] = false;
+				t_values[3] = Int64GetDatum((long long) cstate->cur_lineno);
+				t_isnull[3] = false;
+				t_values[4] = CStringGetTextDatum(cstate->line_buf.data);
+				t_isnull[4] = false;
+				t_values[5] = (Datum) 0;
+				t_isnull[5] = true;
+				t_values[6] = (Datum) 0;
+				t_isnull[6] = true;
+				t_values[7] = CStringGetTextDatum(errmsg_extra);
+				t_isnull[7] = false;
+				t_values[8] = (Datum) 0;
+				t_isnull[8] = true;
+				t_values[9] = CStringGetTextDatum(
+							  unpack_sql_state(ERRCODE_BAD_COPY_FILE_FORMAT));
+				t_isnull[9] = false;
+
+				copy_errors_tup = heap_form_tuple(copy_errors_tupDesc,
+												 t_values,
+												 t_isnull);
+
+				/* using copy_errors owner do the simple_heap_insert */
+				SetUserIdAndSecContext(cstate->copy_errors_owner,
+									save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
+									SECURITY_NOFORCE_RLS);
+				simple_heap_insert(copy_errorsrel, copy_errors_tup);
+
+				/* Restore userid and security context */
+				SetUserIdAndSecContext(save_userid, save_sec_context);
+				cstate->line_error_occured = true;
+				cstate->error_rows_cnt++;
+				table_close(copy_errorsrel, RowExclusiveLock);
+				return true;
+			}
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						errmsg("extra data after last expected column")));
+		}
 
 		fieldno = 0;
 
@@ -901,10 +973,55 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
 			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for column \"%s\"",
-								NameStr(att->attname))));
+			{
+				if(cstate->opts.save_error)
+				{
+					t_values[0] = ObjectIdGetDatum(save_userid);
+					t_isnull[0] = false;
+					t_values[1] = ObjectIdGetDatum(cstate->rel->rd_rel->oid);
+					t_isnull[1] = false;
+					t_values[2] = CStringGetTextDatum(cstate->filename ? cstate->filename : "STDIN");
+					t_isnull[2] = false;
+					t_values[3] = Int64GetDatum((long long) cstate->cur_lineno);
+					t_isnull[3] = false;
+					t_values[4] = CStringGetTextDatum(cstate->line_buf.data);
+					t_isnull[4] = false;
+					t_values[5] = (Datum) 0;
+					t_isnull[5] = true;
+					t_values[6] = (Datum) 0;
+					t_isnull[6] = true;
+					t_values[7] = CStringGetTextDatum(
+								psprintf("missing data for column \"%s\"", NameStr(att->attname)));
+					t_isnull[7] = false;
+					t_values[8] = (Datum) 0;
+					t_isnull[8] = true;
+					t_values[9] = CStringGetTextDatum(
+									unpack_sql_state(ERRCODE_BAD_COPY_FILE_FORMAT));
+					t_isnull[9] = false;
+
+					copy_errors_tup = heap_form_tuple(copy_errors_tupDesc,
+													t_values,
+													t_isnull);
+					/* using copy_errors owner do the simple_heap_insert */
+					SetUserIdAndSecContext(cstate->copy_errors_owner,
+										save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
+										SECURITY_NOFORCE_RLS);
+					simple_heap_insert(copy_errorsrel, copy_errors_tup);
+
+					/* Restore userid and security context */
+					SetUserIdAndSecContext(save_userid, save_sec_context);
+					cstate->line_error_occured = true;
+					cstate->error_rows_cnt++;
+					table_close(copy_errorsrel, RowExclusiveLock);
+					return true;
+				}
+				else
+					ereport(ERROR,
+							(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+							 errmsg("missing data for column \"%s\"",
+									NameStr(att->attname))));
+			}
+
 			string = field_strings[fieldno++];
 
 			if (cstate->convert_select_flags &&
@@ -956,15 +1073,91 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
 				values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
 			}
 			else
-				values[m] = InputFunctionCall(&in_functions[m],
-											  string,
-											  typioparams[m],
-											  att->atttypmod);
+			{
+				/*
+				*
+				* InputFunctionCall is more faster than InputFunctionCallSafe.
+				*
+				*/
+				if(!cstate->opts.save_error)
+					values[m] = InputFunctionCall(&in_functions[m],
+												string,
+												typioparams[m],
+												att->atttypmod);
+				else
+				{
+					if (!InputFunctionCallSafe(&in_functions[m],
+											string,
+											typioparams[m],
+											att->atttypmod,
+											(Node *) cstate->escontext,
+											&values[m]))
+					{
+						char	*err_detail;
+						char	*err_code;
+						err_code = pstrdup(unpack_sql_state(cstate->escontext->error_data->sqlerrcode));
 
+						if (!cstate->escontext->error_data->detail)
+							err_detail = NULL;
+						else
+							err_detail = cstate->escontext->error_data->detail;
+
+						t_values[0] = ObjectIdGetDatum(save_userid);
+						t_isnull[0] = false;
+						t_values[1] = ObjectIdGetDatum(cstate->rel->rd_rel->oid);
+						t_isnull[1] = false;
+						t_values[2] = CStringGetTextDatum(cstate->filename ? cstate->filename : "STDIN");
+						t_isnull[2] = false;
+						t_values[3] = Int64GetDatum((long long) cstate->cur_lineno);
+						t_isnull[3] = false;
+						t_values[4] = CStringGetTextDatum(cstate->line_buf.data);
+						t_isnull[4] = false;
+						t_values[5] = CStringGetTextDatum(cstate->cur_attname);
+						t_isnull[5] = false;
+						t_values[6] = CStringGetTextDatum(string);
+						t_isnull[6] = false;
+						t_values[7] = CStringGetTextDatum(cstate->escontext->error_data->message);
+						t_isnull[7] = false;
+						t_values[8] = err_detail ? CStringGetTextDatum(err_detail) : (Datum) 0;
+						t_isnull[8] = err_detail ? false: true;
+						t_values[9] = CStringGetTextDatum(err_code);
+						t_isnull[9] = false;
+
+						copy_errors_tup = heap_form_tuple(copy_errors_tupDesc,
+														t_values,
+														t_isnull);
+						/* using copy_errors owner do the simple_heap_insert */
+						SetUserIdAndSecContext(cstate->copy_errors_owner,
+											save_sec_context | SECURITY_LOCAL_USERID_CHANGE |
+											SECURITY_NOFORCE_RLS);
+
+						simple_heap_insert(copy_errorsrel, copy_errors_tup);
+
+						/* Restore userid and security context */
+						SetUserIdAndSecContext(save_userid, save_sec_context);
+
+						/* line error occured, set it once per line */
+						if (!cstate->line_error_occured)
+							cstate->line_error_occured = true;
+						/* reset ErrorSaveContext */
+						cstate->escontext->error_occurred = false;
+						cstate->escontext->details_wanted = true;
+						memset(cstate->escontext->error_data,0, sizeof(ErrorData));
+					}
+				}
+			}
 			cstate->cur_attname = NULL;
 			cstate->cur_attval = NULL;
 		}
 
+		/* record error rows count. */
+		if (cstate->line_error_occured)
+		{
+			cstate->error_rows_cnt++;
+			Assert(cstate->opts.save_error);
+		}
+		if (cstate->opts.save_error)
+			table_close(copy_errorsrel, RowExclusiveLock);
 		Assert(fieldno == attr_count);
 	}
 	else
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 63f172e1..f42e72aa 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -755,7 +755,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 	RESET RESTART RESTRICT RETURN RETURNING RETURNS REVOKE RIGHT ROLE ROLLBACK ROLLUP
 	ROUTINE ROUTINES ROW ROWS RULE
 
-	SAVEPOINT SCALAR SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT
+	SAVEPOINT SAVE_ERROR SCALAR SCHEMA SCHEMAS SCROLL SEARCH SECOND_P SECURITY SELECT
 	SEQUENCE SEQUENCES
 	SERIALIZABLE SERVER SESSION SESSION_USER SET SETS SETOF SHARE SHOW
 	SIMILAR SIMPLE SKIP SMALLINT SNAPSHOT SOME SQL_P STABLE STANDALONE_P
@@ -3448,6 +3448,10 @@ copy_opt_item:
 				{
 					$$ = makeDefElem("encoding", (Node *) makeString($2), @1);
 				}
+			| SAVE_ERROR
+				{
+					$$ = makeDefElem("save_error", (Node *) makeBoolean(true), @1);
+				}
 		;
 
 /* The following exist for backward compatibility with very old versions */
@@ -17346,6 +17350,7 @@ unreserved_keyword:
 			| ROWS
 			| RULE
 			| SAVEPOINT
+			| SAVE_ERROR
 			| SCALAR
 			| SCHEMA
 			| SCHEMAS
@@ -17954,6 +17959,7 @@ bare_label_keyword:
 			| ROWS
 			| RULE
 			| SAVEPOINT
+			| SAVE_ERROR
 			| SCALAR
 			| SCHEMA
 			| SCHEMAS
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 04980118..e6a358e0 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2890,7 +2890,8 @@ psql_completion(const char *text, int start, int end)
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "("))
 		COMPLETE_WITH("FORMAT", "FREEZE", "DELIMITER", "NULL",
 					  "HEADER", "QUOTE", "ESCAPE", "FORCE_QUOTE",
-					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING", "DEFAULT");
+					  "FORCE_NOT_NULL", "FORCE_NULL", "ENCODING", "DEFAULT",
+					  "SAVE_ERROR");
 
 	/* Complete COPY <sth> FROM|TO filename WITH (FORMAT */
 	else if (Matches("COPY|\\copy", MatchAny, "FROM|TO", MatchAny, "WITH", "(", "FORMAT"))
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b9..aa560dbb 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -43,6 +43,7 @@ typedef struct CopyFormatOptions
 	bool		binary;			/* binary format? */
 	bool		freeze;			/* freeze rows on loading? */
 	bool		csv_mode;		/* Comma Separated Value format? */
+	bool		save_error;		/*  save error to a table? */
 	CopyHeaderChoice header_line;	/* header line? */
 	char	   *null_print;		/* NULL marker string (server encoding!) */
 	int			null_print_len; /* length of same */
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 5ec41589..2c3b7b42 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -16,6 +16,7 @@
 
 #include "commands/copy.h"
 #include "commands/trigger.h"
+#include "nodes/miscnodes.h"
 
 /*
  * Represents the different source cases we need to worry about at
@@ -94,6 +95,11 @@ typedef struct CopyFromStateData
 								 * default value */
 	FmgrInfo   *in_functions;	/* array of input functions for each attrs */
 	Oid		   *typioparams;	/* array of element types for in_functions */
+	Oid		   copy_errors_owner;	/* the owner of copy_errors table */
+	ErrorSaveContext *escontext; 	/* soft error trapper during in_functions execution */
+	uint64		error_rows_cnt; /* total number of rows that have errors */
+	const char 	*copy_errors_nspname;		/* the copy_errors's namespace */
+	bool	 	line_error_occured;	/* does this line conversion error happened */
 	int		   *defmap;			/* array of default att numbers related to
 								 * missing att */
 	ExprState **defexprs;		/* array of default att expressions for all
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 5984dcfa..d0988a4c 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -377,6 +377,7 @@ PG_KEYWORD("routines", ROUTINES, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("row", ROW, COL_NAME_KEYWORD, BARE_LABEL)
 PG_KEYWORD("rows", ROWS, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("rule", RULE, UNRESERVED_KEYWORD, BARE_LABEL)
+PG_KEYWORD("save_error", SAVE_ERROR, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("savepoint", SAVEPOINT, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("scalar", SCALAR, UNRESERVED_KEYWORD, BARE_LABEL)
 PG_KEYWORD("schema", SCHEMA, UNRESERVED_KEYWORD, BARE_LABEL)
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index c4178b9c..a2c6bf5aa 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -564,6 +564,118 @@ ERROR:  conflicting or redundant options
 LINE 1: ... b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *, FORCE_NULL...
                                                              ^
 ROLLBACK;
+--
+-- tests for SAVE_ERROR option with force_not_null, force_null
+\pset null NULL
+CREATE TABLE save_error_csv(
+    a INT NOT NULL,
+    b TEXT NOT NULL,
+    c TEXT
+);
+--save_error not allowed in binary mode
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT binary);
+ERROR:  cannot specify SAVE_ERROR in BINARY mode
+-- redundant options not allowed.
+COPY save_error_csv FROM STDIN WITH (save_error, save_error off);
+ERROR:  conflicting or redundant options
+LINE 1: COPY save_error_csv FROM STDIN WITH (save_error, save_error ...
+                                                         ^
+create table COPY_ERRORS();
+--should fail. since table COPY_ERRORS already exists.
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error);
+ERROR:  table public.COPY_ERRORS already exists. cannot use it for COPY FROM error saving
+drop table COPY_ERRORS;
+--with FORCE_NOT_NULL and FORCE_NULL.
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c));
+NOTICE:  2 rows were skipped because of conversion error. Skipped rows saved to table public.copy_errors
+SELECT *, b is null as b_null, b = '' as b_empty FROM save_error_csv;
+ a | b |  c   | b_null | b_empty 
+---+---+------+--------+---------
+ 2 |   | NULL | f      | t
+(1 row)
+
+DROP TABLE save_error_csv;
+-- save error with extra data and missing data some column.
+---normal data type conversion error case.
+CREATE TABLE check_ign_err (n int, m int[], k bigint, l text);
+COPY check_ign_err FROM STDIN WITH (save_error);
+NOTICE:  10 rows were skipped because of conversion error. Skipped rows saved to table public.copy_errors
+select	pc.relname, ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+from	copy_errors ce	join pg_class pc on pc.oid = ce.copy_destination
+where 	pc.relname = 'check_ign_err';
+    relname    | filename | lineno |                    line                    | colname |     raw_field_value     |                           err_message                           |        err_detail         | errorcode 
+---------------+----------+--------+--------------------------------------------+---------+-------------------------+-----------------------------------------------------------------+---------------------------+-----------
+ check_ign_err | STDIN    |      1 | 1       {1}     1       1       extra      | NULL    | NULL                    | extra data after last expected column                           | NULL                      | 22P04
+ check_ign_err | STDIN    |      2 | 2                                          | NULL    | NULL                    | missing data for column "m"                                     | NULL                      | 22P04
+ check_ign_err | STDIN    |      3 | \n      {1}     1       \-                 | n       |                        +| invalid input syntax for type integer: "                       +| NULL                      | 22P02
+               |          |        |                                            |         |                         | "                                                               |                           | 
+ check_ign_err | STDIN    |      4 | a       {2}     2       \r                 | n       | a                       | invalid input syntax for type integer: "a"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |      5 | 3       {\3}    3333333333      \n         | m       | {\x03}                  | invalid input syntax for type integer: "\x03"                   | NULL                      | 22P02
+ check_ign_err | STDIN    |      6 | 0x11    {3,}    3333333333      \\.        | m       | {3,}                    | malformed array literal: "{3,}"                                 | Unexpected "}" character. | 22P02
+ check_ign_err | STDIN    |      7 | d       {3,1/}  3333333333      \\0        | n       | d                       | invalid input syntax for type integer: "d"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |      7 | d       {3,1/}  3333333333      \\0        | m       | {3,1/}                  | invalid input syntax for type integer: "1/"                     | NULL                      | 22P02
+ check_ign_err | STDIN    |      8 | e       {3,\1}  -3323879289873933333333 \n | n       | e                       | invalid input syntax for type integer: "e"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |      8 | e       {3,\1}  -3323879289873933333333 \n | m       | {3,\x01}                | invalid input syntax for type integer: "\x01"                   | NULL                      | 22P02
+ check_ign_err | STDIN    |      8 | e       {3,\1}  -3323879289873933333333 \n | k       | -3323879289873933333333 | value "-3323879289873933333333" is out of range for type bigint | NULL                      | 22003
+ check_ign_err | STDIN    |      9 | f       {3,1}   3323879289873933333333  \r | n       | f                       | invalid input syntax for type integer: "f"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |      9 | f       {3,1}   3323879289873933333333  \r | k       | 3323879289873933333333  | value "3323879289873933333333" is out of range for type bigint  | NULL                      | 22003
+ check_ign_err | STDIN    |     10 | b       {a, 4}  1.1     h                  | n       | b                       | invalid input syntax for type integer: "b"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |     10 | b       {a, 4}  1.1     h                  | m       | {a, 4}                  | invalid input syntax for type integer: "a"                      | NULL                      | 22P02
+ check_ign_err | STDIN    |     10 | b       {a, 4}  1.1     h                  | k       | 1.1                     | invalid input syntax for type bigint: "1.1"                     | NULL                      | 22P02
+(16 rows)
+
+DROP TABLE check_ign_err;
+truncate COPY_ERRORS;
+--(type textrange was already made in test_setup.sql)
+--using textrange doing test
+begin;
+CREATE USER regress_user12;
+CREATE USER regress_user13;
+CREATE SCHEMA IF NOT EXISTS copy_errors_test AUTHORIZATION regress_user12;
+SET LOCAL search_path TO copy_errors_test;
+GRANT USAGE on schema copy_errors_test to regress_user12,regress_user13;
+GRANT CREATE on schema copy_errors_test to regress_user12;
+set role regress_user12;
+CREATE TABLE textrange_input(a public.textrange, b public.textrange, c public.textrange);
+GRANT insert on textrange_input to regress_user13;
+set role regress_user13;
+COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *);
+NOTICE:  2 rows were skipped because of conversion error. Skipped rows saved to table copy_errors_test.copy_errors
+SAVEPOINT s1;
+--should fail. no priviledge
+select * from copy_errors_test.copy_errors;
+ERROR:  permission denied for table copy_errors
+ROLLBACK to s1;
+set role regress_user12;
+COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *);
+NOTICE:  2 rows were skipped because of conversion error. Skipped rows saved to table copy_errors_test.copy_errors
+SELECT	pc.relname,pr.rolname,ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+FROM	copy_errors_test.copy_errors ce
+JOIN 	pg_class pc ON pc.oid = ce.copy_destination
+JOIN 	pg_roles pr ON pr.oid = ce.userid;
+     relname     |    rolname     | filename | lineno |            line            | colname | raw_field_value |                            err_message                            |                err_detail                | errorcode 
+-----------------+----------------+----------+--------+----------------------------+---------+-----------------+-------------------------------------------------------------------+------------------------------------------+-----------
+ textrange_input | regress_user13 | STDIN    |      1 | ,-[a\","z),[a","-inf)      | b       | -[a\,z)         | malformed range literal: "-[a\,z)"                                | Missing left parenthesis or bracket.     | 22P02
+ textrange_input | regress_user13 | STDIN    |      1 | ,-[a\","z),[a","-inf)      | c       | [a,-inf)        | range lower bound must be less than or equal to range upper bound | NULL                                     | 22000
+ textrange_input | regress_user13 | STDIN    |      2 | (",a),(",",a),()",a);      | a       | (,a),(          | malformed range literal: "(,a),("                                 | Junk after right parenthesis or bracket. | 22P02
+ textrange_input | regress_user13 | STDIN    |      2 | (",a),(",",a),()",a);      | b       | ,a),()          | malformed range literal: ",a),()"                                 | Missing left parenthesis or bracket.     | 22P02
+ textrange_input | regress_user13 | STDIN    |      2 | (",a),(",",a),()",a);      | c       | a);             | malformed range literal: "a);"                                    | Missing left parenthesis or bracket.     | 22P02
+ textrange_input | regress_user12 | STDIN    |      1 | (a",")),(]","a),(a","])    | a       | (a,))           | malformed range literal: "(a,))"                                  | Junk after right parenthesis or bracket. | 22P02
+ textrange_input | regress_user12 | STDIN    |      1 | (a",")),(]","a),(a","])    | b       | (],a)           | malformed range literal: "(],a)"                                  | Missing comma after lower bound.         | 22P02
+ textrange_input | regress_user12 | STDIN    |      1 | (a",")),(]","a),(a","])    | c       | (a,])           | malformed range literal: "(a,])"                                  | Junk after right parenthesis or bracket. | 22P02
+ textrange_input | regress_user12 | STDIN    |      2 | [z","a],[z","2],[(","",")] | a       | [z,a]           | range lower bound must be less than or equal to range upper bound | NULL                                     | 22000
+ textrange_input | regress_user12 | STDIN    |      2 | [z","a],[z","2],[(","",")] | b       | [z,2]           | range lower bound must be less than or equal to range upper bound | NULL                                     | 22000
+ textrange_input | regress_user12 | STDIN    |      2 | [z","a],[z","2],[(","",")] | c       | [(,",)]         | malformed range literal: "[(,",)]"                                | Unexpected end of input.                 | 22P02
+(11 rows)
+
+--owner allowed to drop the table.
+drop table copy_errors;
+--should fail. no priviledge
+select * from public.copy_errors;
+ERROR:  permission denied for table copy_errors
+ROLLBACK;
 \pset null ''
 -- test case with whole-row Var in a check constraint
 create table check_con_tbl (f1 int);
@@ -822,3 +934,28 @@ truncate copy_default;
 -- DEFAULT cannot be used in COPY TO
 copy (select 1 as test) TO stdout with (default '\D');
 ERROR:  COPY DEFAULT only available using COPY FROM
+-- DEFAULT WITH SAVE_ERROR.
+create table copy_default_error_save (
+	id integer,
+	text_value text not null default 'test',
+	ts_value timestamp without time zone not null default '2022-07-05'
+);
+copy copy_default_error_save from stdin with (save_error, default '\D');
+NOTICE:  3 rows were skipped because of conversion error. Skipped rows saved to table public.copy_errors
+select 	ce.filename,ce.lineno,ce.line,
+		ce.colname, ce.raw_field_value,
+		ce.err_message, ce.err_detail,ce.errorcode
+from 	public.copy_errors ce
+join	pg_class	pc	on pc.oid = ce.copy_destination
+where	pc.relname = 'copy_default_error_save'
+order	by lineno, colname;
+ filename | lineno |               line               | colname  | raw_field_value  |                         err_message                         | err_detail | errorcode 
+----------+--------+----------------------------------+----------+------------------+-------------------------------------------------------------+------------+-----------
+ STDIN    |      1 | k       value   '2022-07-04'     | id       | k                | invalid input syntax for type integer: "k"                  |            | 22P02
+ STDIN    |      2 | z       \D      '2022-07-03ASKL' | id       | z                | invalid input syntax for type integer: "z"                  |            | 22P02
+ STDIN    |      2 | z       \D      '2022-07-03ASKL' | ts_value | '2022-07-03ASKL' | invalid input syntax for type timestamp: "'2022-07-03ASKL'" |            | 22007
+ STDIN    |      3 | s       \D      \D               | id       | s                | invalid input syntax for type integer: "s"                  |            | 22P02
+(4 rows)
+
+drop table copy_default_error_save, copy_errors;
+truncate copy_default;
diff --git a/src/test/regress/sql/copy2.sql b/src/test/regress/sql/copy2.sql
index a5486f60..a37986df 100644
--- a/src/test/regress/sql/copy2.sql
+++ b/src/test/regress/sql/copy2.sql
@@ -374,6 +374,106 @@ BEGIN;
 COPY forcetest (a, b, c) FROM STDIN WITH (FORMAT csv, FORCE_NULL *, FORCE_NULL(b));
 ROLLBACK;
 
+--
+-- tests for SAVE_ERROR option with force_not_null, force_null
+\pset null NULL
+CREATE TABLE save_error_csv(
+    a INT NOT NULL,
+    b TEXT NOT NULL,
+    c TEXT
+);
+
+--save_error not allowed in binary mode
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT binary);
+
+-- redundant options not allowed.
+COPY save_error_csv FROM STDIN WITH (save_error, save_error off);
+
+create table COPY_ERRORS();
+--should fail. since table COPY_ERRORS already exists.
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error);
+
+drop table COPY_ERRORS;
+
+--with FORCE_NOT_NULL and FORCE_NULL.
+COPY save_error_csv (a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NOT_NULL(b), FORCE_NULL(c));
+z,,""
+\0,,
+2,,
+\.
+
+SELECT *, b is null as b_null, b = '' as b_empty FROM save_error_csv;
+DROP TABLE save_error_csv;
+
+-- save error with extra data and missing data some column.
+---normal data type conversion error case.
+CREATE TABLE check_ign_err (n int, m int[], k bigint, l text);
+COPY check_ign_err FROM STDIN WITH (save_error);
+1	{1}	1	1	extra
+2
+\n	{1}	1	\-
+a	{2}	2	\r
+3	{\3}	3333333333	\n
+0x11	{3,}	3333333333	\\.
+d	{3,1/}	3333333333	\\0
+e	{3,\1}	-3323879289873933333333	\n
+f	{3,1}	3323879289873933333333	\r
+b	{a, 4}	1.1	h
+5	{5}	5	\\
+\.
+
+select	pc.relname, ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+from	copy_errors ce	join pg_class pc on pc.oid = ce.copy_destination
+where 	pc.relname = 'check_ign_err';
+
+DROP TABLE check_ign_err;
+truncate COPY_ERRORS;
+
+--(type textrange was already made in test_setup.sql)
+--using textrange doing test
+begin;
+CREATE USER regress_user12;
+CREATE USER regress_user13;
+CREATE SCHEMA IF NOT EXISTS copy_errors_test AUTHORIZATION regress_user12;
+SET LOCAL search_path TO copy_errors_test;
+
+GRANT USAGE on schema copy_errors_test to regress_user12,regress_user13;
+GRANT CREATE on schema copy_errors_test to regress_user12;
+set role regress_user12;
+CREATE TABLE textrange_input(a public.textrange, b public.textrange, c public.textrange);
+GRANT insert on textrange_input to regress_user13;
+
+set role regress_user13;
+COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *);
+,-[a\","z),[a","-inf)
+(",a),(",",a),()",a);
+\.
+
+SAVEPOINT s1;
+--should fail. no priviledge
+select * from copy_errors_test.copy_errors;
+
+ROLLBACK to s1;
+
+set role regress_user12;
+COPY textrange_input(a, b, c) FROM STDIN WITH (save_error,FORMAT csv, FORCE_NULL *);
+(a",")),(]","a),(a","])
+[z","a],[z","2],[(","",")]
+\.
+
+SELECT	pc.relname,pr.rolname,ce.filename,ce.lineno,ce.line,ce.colname,
+		ce.raw_field_value,ce.err_message,ce.err_detail,ce.errorcode
+FROM	copy_errors_test.copy_errors ce
+JOIN 	pg_class pc ON pc.oid = ce.copy_destination
+JOIN 	pg_roles pr ON pr.oid = ce.userid;
+
+--owner allowed to drop the table.
+drop table copy_errors;
+
+--should fail. no priviledge
+select * from public.copy_errors;
+ROLLBACK;
 \pset null ''
 
 -- test case with whole-row Var in a check constraint
@@ -609,3 +709,26 @@ truncate copy_default;
 
 -- DEFAULT cannot be used in COPY TO
 copy (select 1 as test) TO stdout with (default '\D');
+
+-- DEFAULT WITH SAVE_ERROR.
+create table copy_default_error_save (
+	id integer,
+	text_value text not null default 'test',
+	ts_value timestamp without time zone not null default '2022-07-05'
+);
+copy copy_default_error_save from stdin with (save_error, default '\D');
+k	value	'2022-07-04'
+z	\D	'2022-07-03ASKL'
+s	\D	\D
+\.
+
+select 	ce.filename,ce.lineno,ce.line,
+		ce.colname, ce.raw_field_value,
+		ce.err_message, ce.err_detail,ce.errorcode
+from 	public.copy_errors ce
+join	pg_class	pc	on pc.oid = ce.copy_destination
+where	pc.relname = 'copy_default_error_save'
+order	by lineno, colname;
+
+drop table copy_default_error_save, copy_errors;
+truncate copy_default;
\ No newline at end of file
-- 
2.34.1

Reply via email to