Hello,
The attached patch add error handling for
Extra data
missing data
invalid oid
null oid and
row count mismatch
And the record that field on the above case write to the file with appended
error message in it and in case of unique violation or exclusion constraint
violation error the failed record write as it is because the case of the
error can not be identified specifically
The new syntax became :
COPY ... WITH ON CONFLICT LOG maximum_error, LOG FILE NAME '…';
Regards
Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 13a8b68d95..bf21abd8e0 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -364,6 +364,17 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>IGNORE_CONFLICTS</literal></term>
+ <listitem>
+ <para>
+ specifies ignore to error up to specified amount .
+ Instead write the error record to failed record file and
+ precede to the next record
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</refsect1>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9bc67ce60f..ffa6aecbd5 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -43,6 +43,7 @@
#include "port/pg_bswap.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
+#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -118,6 +119,7 @@ typedef struct CopyStateData
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
+ FILE *failed_rec_file;
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
@@ -147,6 +149,9 @@ typedef struct CopyStateData
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
+ char *failed_rec_filename;
+ bool ignore_conflict;
+ int error_limit; /* total # of error to log */
/* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
@@ -766,6 +771,21 @@ CopyLoadRawBuf(CopyState cstate)
return (inbytes > 0);
}
+/*
+ * LogCopyError log error in to error log file
+ */
+static void
+LogCopyError(CopyState cstate, const char *str)
+{
+ appendBinaryStringInfo(&cstate->line_buf, str, strlen(str));
+#ifndef WIN32
+ appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+ appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+ fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+ cstate->error_limit--;
+}
/*
* DoCopy executes the SQL COPY statement
@@ -1226,6 +1246,19 @@ ProcessCopyOptions(ParseState *pstate,
defel->defname),
parser_errposition(pstate, defel->location)));
}
+ else if (strcmp(defel->defname, "ignore_conflicts") == 0)
+ {
+ List *conflictOption;
+ if (cstate->ignore_conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ cstate->ignore_conflict = true;
+ conflictOption = (List *) defel->arg;
+ cstate->error_limit = intVal(list_nth(conflictOption, 0));
+ cstate->failed_rec_filename = strVal(list_nth(conflictOption, 1));
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1749,6 +1782,11 @@ EndCopy(CopyState cstate)
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
cstate->filename)));
+ if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m",
+ cstate->failed_rec_filename)));
}
MemoryContextDelete(cstate->copycontext);
@@ -2461,6 +2499,8 @@ CopyFrom(CopyState cstate)
hi_options |= HEAP_INSERT_FROZEN;
}
+ if (!cstate->ignore_conflict)
+ cstate->error_limit = 0;
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of code
@@ -2579,6 +2619,10 @@ CopyFrom(CopyState cstate)
*/
insertMethod = CIM_SINGLE;
}
+ else if (cstate->ignore_conflict)
+ {
+ insertMethod = CIM_SINGLE;
+ }
else
{
/*
@@ -2968,12 +3012,59 @@ CopyFrom(CopyState cstate)
*/
tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
+ else if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ bool specConflict;
+ uint32 specToken;
+ specConflict = false;
+
+ specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+ HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken);
+
+ /* insert the tuple, with the speculative token */
+ heap_insert(resultRelInfo->ri_RelationDesc, tuple,
+ estate->es_output_cid,
+ HEAP_INSERT_SPECULATIVE,
+ NULL);
+
+ /* insert index entries for tuple */
+ recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+ estate, true, &specConflict,
+ NIL);
+
+ /* adjust the tuple's state accordingly */
+ if (!specConflict)
+ {
+ heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple);
+ processed++;
+ }
+ else
+ {
+ heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple);
+#ifndef WIN32
+ appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+ appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+ fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+ cstate->error_limit--;
+
+ }
+
+ /*
+ * Wake up anyone waiting for our decision. They will re-check
+ * the tuple, see that it's no longer speculative, and wait on our
+ * XID as if this was a regularly inserted tuple all along.
+ */
+ SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+ }
else
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
mycid, hi_options, bistate);
/* And create index entries for it */
- if (resultRelInfo->ri_NumIndices > 0)
+ if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0)
recheckIndexes = ExecInsertIndexTuples(slot,
&(tuple->t_self),
estate,
@@ -2994,7 +3085,8 @@ CopyFrom(CopyState cstate)
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command.
*/
- processed++;
+ if(!cstate->ignore_conflict)
+ processed++;
}
}
@@ -3286,6 +3378,48 @@ BeginCopyFrom(ParseState *pstate,
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
+ if (cstate->failed_rec_filename)
+ {
+ mode_t oumask; /* Pre-existing umask value */
+ struct stat st;
+ /*
+ * Prevent write to relative path ... too easy to shoot oneself in
+ * the foot by overwriting a database file ...
+ */
+ if (!is_absolute_path(cstate->failed_rec_filename))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("relative path not allowed for failed record file")));
+ oumask = umask(S_IWGRP | S_IWOTH);
+ PG_TRY();
+ {
+ cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W);
+ }
+ PG_CATCH();
+ {
+ umask(oumask);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ umask(oumask);
+ if (cstate->failed_rec_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for writing: %m",
+ cstate->failed_rec_filename)));
+
+ if (fstat(fileno(cstate->failed_rec_file), &st))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ cstate->failed_rec_filename)));
+
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->failed_rec_filename)));
+ }
+
if (data_source_cb)
{
cstate->copy_dest = COPY_CALLBACK;
@@ -3498,7 +3632,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
if (!cstate->binary)
{
char **field_strings;
@@ -3513,9 +3647,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* check for overflowing fields */
if (nfields > 0 && fldct > nfields)
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " extra data after last expected column");
+ goto next_line;
+ }else
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("extra data after last expected column")));
+ }
fieldno = 0;
@@ -3523,15 +3664,29 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
if (file_has_oids)
{
if (fieldno >= fldct)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("missing data for OID column")));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " missing data for OID column");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("missing data for OID column")));
+ }
string = field_strings[fieldno++];
if (string == NULL)
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " null OID in COPY data");
+ goto next_line;
+ }else
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("null OID in COPY data")));
+ }
else if (cstate->oids && tupleOid != NULL)
{
cstate->cur_attname = "oid";
@@ -3539,9 +3694,17 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
*tupleOid = DatumGetObjectId(DirectFunctionCall1(oidin,
CStringGetDatum(string)));
if (*tupleOid == InvalidOid)
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " invalid OID in COPY data");
+ goto next_line;
+ }else
+
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("invalid OID in COPY data")));
+ }
cstate->cur_attname = NULL;
cstate->cur_attval = NULL;
}
@@ -3555,10 +3718,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
if (fieldno >= fldct)
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ appendStringInfo(&cstate->line_buf, " missing data for column %s",
+ NameStr(att->attname));
+ LogCopyError(cstate, " ");
+ goto next_line;
+ }else
+
ereport(ERROR,
(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
errmsg("missing data for column \"%s\"",
NameStr(att->attname))));
+ }
string = field_strings[fieldno++];
if (cstate->convert_select_flags &&
@@ -3645,10 +3818,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
}
if (fld_count != attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("row field count is %d, expected %d",
- (int) fld_count, attr_count)));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d",
+ (int) fld_count, attr_count);
+ LogCopyError(cstate, " ");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("row field count is %d, expected %d",
+ (int) fld_count, attr_count)));
+ }
if (file_has_oids)
{
@@ -3663,9 +3845,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
-1,
&isnull));
if (isnull || loaded_oid == InvalidOid)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("invalid OID in COPY data")));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " invalid OID in COPY data");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("invalid OID in COPY data")));
+ }
cstate->cur_attname = NULL;
if (cstate->oids && tupleOid != NULL)
*tupleOid = loaded_oid;
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 87f5e95827..c1084f71bc 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -632,7 +632,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
EXTENSION EXTERNAL EXTRACT
- FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+ FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR
FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -650,7 +650,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -3107,6 +3107,10 @@ copy_opt_item:
{
$$ = makeDefElem("encoding", (Node *)makeString($2), @1);
}
+ | ON CONFLICT LOG_P Iconst ',' LOG_P FILE_P NAME_P Sconst
+ {
+ $$ = makeDefElem("ignore_conflicts", (Node *)list_make2(makeInteger($4), makeString($9)), @1);
+ }
;
/* The following exist for backward compatibility with very old versions */
@@ -15086,6 +15090,7 @@ unreserved_keyword:
| EXTENSION
| EXTERNAL
| FAMILY
+ | FILE_P
| FILTER
| FIRST_P
| FOLLOWING
@@ -15134,6 +15139,7 @@ unreserved_keyword:
| LOCATION
| LOCK_P
| LOCKED
+ | LOG_P
| LOGGED
| MAPPING
| MATCH
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 23db40147b..442562b0fe 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -162,6 +162,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
+PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD)
PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD)
@@ -242,6 +243,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD)
PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
+PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)