On Thu, Nov 29, 2018 at 3:15 PM Dmitry Dolgov <[email protected]> wrote:
>
> Unfortunately, the patch conflict-handling-onCopy-from-v2.patch has some
> conflicts now, could you rebase it?
>
Thank you for informing, attach is rebased patch against current master
Regards
Surafel
diff --git a/doc/src/sgml/ref/copy.sgml b/doc/src/sgml/ref/copy.sgml
index 411941ed31..33015451a5 100644
--- a/doc/src/sgml/ref/copy.sgml
+++ b/doc/src/sgml/ref/copy.sgml
@@ -353,6 +353,28 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
</listitem>
</varlistentry>
+ <varlistentry>
+ <term><literal>on_conflict_log</literal></term>
+ <listitem>
+ <para>
+ Specifies to log error record up to specified amount.
+ Instead write the record to log file and
+ precede to the next record
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term><literal>log_file_name</literal></term>
+ <listitem>
+ <para>
+ The path name of the log file. It must be an absolute
+ path. Windows users might need to use an <literal>E''</literal> string and
+ double any backslashes used in the path name.
+ </para>
+ </listitem>
+ </varlistentry>
+
</variablelist>
</refsect1>
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 4311e16007..b4b707c3f6 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -44,6 +44,7 @@
#include "port/pg_bswap.h"
#include "rewrite/rewriteHandler.h"
#include "storage/fd.h"
+#include "storage/lmgr.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -121,6 +122,7 @@ typedef struct CopyStateData
int file_encoding; /* file or remote side's character encoding */
bool need_transcoding; /* file encoding diff from server? */
bool encoding_embeds_ascii; /* ASCII can be non-first byte? */
+ FILE *failed_rec_file; /* used if ignore_conflict is true */
/* parameters from the COPY command */
Relation rel; /* relation to copy to or from */
@@ -149,6 +151,9 @@ typedef struct CopyStateData
bool convert_selectively; /* do selective binary conversion? */
List *convert_select; /* list of column names (can be NIL) */
bool *convert_select_flags; /* per-column CSV/TEXT CS flags */
+ char *failed_rec_filename; /* failed record filename */
+ bool ignore_conflict;
+ int error_limit; /* total # of error to log */
/* these are just for error messages, see CopyFromErrorCallback */
const char *cur_relname; /* table name for error messages */
@@ -769,6 +774,21 @@ CopyLoadRawBuf(CopyState cstate)
return (inbytes > 0);
}
+/*
+ * LogCopyError log error in to failed record file
+ */
+static void
+LogCopyError(CopyState cstate, const char *str)
+{
+ appendBinaryStringInfo(&cstate->line_buf, str, strlen(str));
+#ifndef WIN32
+ appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+ appendBinaryStringInfo(&cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+ fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+ cstate->error_limit--;
+}
/*
* DoCopy executes the SQL COPY statement
@@ -1223,6 +1243,32 @@ ProcessCopyOptions(ParseState *pstate,
defel->defname),
parser_errposition(pstate, defel->location)));
}
+ else if (strcmp(defel->defname, "on_conflict_log") == 0)
+ {
+ if (cstate->ignore_conflict)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+
+ cstate->ignore_conflict = true;
+ cstate->error_limit =defGetInt64(defel);
+ if (cstate->error_limit < 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("argument to option \"%s\" must be positive number",
+ defel->defname),
+ parser_errposition(pstate, defel->location)));
+ }
+ else if (strcmp(defel->defname, "log_file_name") == 0)
+ {
+ if (cstate->failed_rec_filename)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("conflicting or redundant options"),
+ parser_errposition(pstate, defel->location)));
+ cstate->failed_rec_filename =defGetString(defel);
+ }
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
@@ -1245,6 +1291,21 @@ ProcessCopyOptions(ParseState *pstate,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("cannot specify NULL in BINARY mode")));
+ if (!cstate->error_limit && cstate->failed_rec_filename)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify log file name without on conflict log option")));
+
+ if (cstate->error_limit && !cstate->failed_rec_filename)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify on conflict log without log file name option")));
+
+ if (cstate->error_limit && !cstate->is_copy_from)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("cannot specify on conflict log on COPY TO")));
+
/* Set defaults for omitted options */
if (!cstate->delim)
cstate->delim = cstate->csv_mode ? "," : "\t";
@@ -1745,6 +1806,11 @@ EndCopy(CopyState cstate)
(errcode_for_file_access(),
errmsg("could not close file \"%s\": %m",
cstate->filename)));
+ if (cstate->failed_rec_filename != NULL && FreeFile(cstate->failed_rec_file))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m",
+ cstate->failed_rec_filename)));
}
MemoryContextDelete(cstate->copycontext);
@@ -2461,6 +2527,8 @@ CopyFrom(CopyState cstate)
hi_options |= HEAP_INSERT_FROZEN;
}
+ if (!cstate->ignore_conflict)
+ cstate->error_limit = 0;
/*
* We need a ResultRelInfo so we can use the regular executor's
* index-entry-making machinery. (There used to be a huge amount of code
@@ -2575,6 +2643,10 @@ CopyFrom(CopyState cstate)
*/
insertMethod = CIM_SINGLE;
}
+ else if (cstate->ignore_conflict)
+ {
+ insertMethod = CIM_SINGLE;
+ }
else
{
/*
@@ -2946,12 +3018,59 @@ CopyFrom(CopyState cstate)
*/
tuple->t_tableOid = RelationGetRelid(resultRelInfo->ri_RelationDesc);
}
+ else if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ bool specConflict;
+ uint32 specToken;
+ specConflict = false;
+
+ specToken = SpeculativeInsertionLockAcquire(GetCurrentTransactionId());
+ HeapTupleHeaderSetSpeculativeToken(tuple->t_data, specToken);
+
+ /* insert the tuple, with the speculative token */
+ heap_insert(resultRelInfo->ri_RelationDesc, tuple,
+ estate->es_output_cid,
+ HEAP_INSERT_SPECULATIVE,
+ NULL);
+
+ /* insert index entries for tuple */
+ recheckIndexes = ExecInsertIndexTuples(slot, &(tuple->t_self),
+ estate, true, &specConflict,
+ NIL);
+
+ /* adjust the tuple's state accordingly */
+ if (!specConflict)
+ {
+ heap_finish_speculative(resultRelInfo->ri_RelationDesc, tuple);
+ processed++;
+ }
+ else
+ {
+ heap_abort_speculative(resultRelInfo->ri_RelationDesc, tuple);
+#ifndef WIN32
+ appendStringInfoCharMacro(&cstate->line_buf, '\n');
+#else
+ appendBinaryStringInfo(&cstate->cstate->line_buf, "\r\n", strlen("\r\n"));
+#endif
+ fwrite(cstate->line_buf.data, 1, cstate->line_buf.len, cstate->failed_rec_file);
+ cstate->error_limit--;
+
+ }
+
+ /*
+ * Wake up anyone waiting for our decision. They will re-check
+ * the tuple, see that it's no longer speculative, and wait on our
+ * XID as if this was a regularly inserted tuple all along.
+ */
+ SpeculativeInsertionLockRelease(GetCurrentTransactionId());
+
+ }
else
heap_insert(resultRelInfo->ri_RelationDesc, tuple,
mycid, hi_options, bistate);
/* And create index entries for it */
- if (resultRelInfo->ri_NumIndices > 0)
+ if (resultRelInfo->ri_NumIndices > 0 && cstate->error_limit == 0)
recheckIndexes = ExecInsertIndexTuples(slot,
&(tuple->t_self),
estate,
@@ -2972,7 +3091,8 @@ CopyFrom(CopyState cstate)
* or FDW; this is the same definition used by nodeModifyTable.c
* for counting tuples inserted by an INSERT command.
*/
- processed++;
+ if(!cstate->ignore_conflict)
+ processed++;
}
}
@@ -3260,6 +3380,48 @@ BeginCopyFrom(ParseState *pstate,
cstate->num_defaults = num_defaults;
cstate->is_program = is_program;
+ if (cstate->failed_rec_filename)
+ {
+ mode_t oumask; /* Pre-existing umask value */
+ struct stat st;
+ /*
+ * Prevent write to relative path ... too easy to shoot oneself in
+ * the foot by overwriting a database file ...
+ */
+ if (!is_absolute_path(cstate->failed_rec_filename))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_NAME),
+ errmsg("relative path not allowed for failed record file")));
+ oumask = umask(S_IWGRP | S_IWOTH);
+ PG_TRY();
+ {
+ cstate->failed_rec_file = AllocateFile(cstate->failed_rec_filename, PG_BINARY_W);
+ }
+ PG_CATCH();
+ {
+ umask(oumask);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ umask(oumask);
+ if (cstate->failed_rec_file == NULL)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\" for writing: %m",
+ cstate->failed_rec_filename)));
+
+ if (fstat(fileno(cstate->failed_rec_file), &st))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not stat file \"%s\": %m",
+ cstate->failed_rec_filename)));
+
+ if (S_ISDIR(st.st_mode))
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("\"%s\" is a directory", cstate->failed_rec_filename)));
+ }
+
if (data_source_cb)
{
cstate->copy_dest = COPY_CALLBACK;
@@ -3458,7 +3620,7 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* Initialize all values for row to NULL */
MemSet(values, 0, num_phys_attrs * sizeof(Datum));
MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-
+next_line:
if (!cstate->binary)
{
char **field_strings;
@@ -3473,9 +3635,16 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
/* check for overflowing fields */
if (attr_count > 0 && fldct > attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("extra data after last expected column")));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ LogCopyError(cstate, " extra data after last expected column");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("extra data after last expected column")));
+ }
fieldno = 0;
@@ -3487,10 +3656,20 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
Form_pg_attribute att = TupleDescAttr(tupDesc, m);
if (fieldno >= fldct)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("missing data for column \"%s\"",
- NameStr(att->attname))));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ appendStringInfo(&cstate->line_buf, " missing data for column %s",
+ NameStr(att->attname));
+ LogCopyError(cstate, " ");
+ goto next_line;
+ }else
+
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("missing data for column \"%s\"",
+ NameStr(att->attname))));
+ }
string = field_strings[fieldno++];
if (cstate->convert_select_flags &&
@@ -3577,10 +3756,19 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
}
if (fld_count != attr_count)
- ereport(ERROR,
- (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
- errmsg("row field count is %d, expected %d",
- (int) fld_count, attr_count)));
+ {
+ if (cstate->ignore_conflict && cstate->error_limit > 0)
+ {
+ appendStringInfo(&cstate->line_buf, "row field count is %d, expected %d",
+ (int) fld_count, attr_count);
+ LogCopyError(cstate, " ");
+ goto next_line;
+ }else
+ ereport(ERROR,
+ (errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+ errmsg("row field count is %d, expected %d",
+ (int) fld_count, attr_count)));
+ }
i = 0;
foreach(cur, cstate->attnumlist)
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index 2c2208ffb7..ecfa5f9874 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -632,7 +632,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
EXCLUDE EXCLUDING EXCLUSIVE EXECUTE EXISTS EXPLAIN
EXTENSION EXTERNAL EXTRACT
- FALSE_P FAMILY FETCH FILTER FIRST_P FLOAT_P FOLLOWING FOR
+ FALSE_P FAMILY FETCH FILE_P FILTER FIRST_P FLOAT_P FOLLOWING FOR
FORCE FOREIGN FORWARD FREEZE FROM FULL FUNCTION FUNCTIONS
GENERATED GLOBAL GRANT GRANTED GREATEST GROUP_P GROUPING GROUPS
@@ -650,7 +650,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
LABEL LANGUAGE LARGE_P LAST_P LATERAL_P
LEADING LEAKPROOF LEAST LEFT LEVEL LIKE LIMIT LISTEN LOAD LOCAL
- LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOGGED
+ LOCALTIME LOCALTIMESTAMP LOCATION LOCK_P LOCKED LOG_P LOGGED
MAPPING MATCH MATERIALIZED MAXVALUE METHOD MINUTE_P MINVALUE MODE MONTH_P MOVE
@@ -3093,6 +3093,14 @@ copy_opt_item:
{
$$ = makeDefElem("encoding", (Node *)makeString($2), @1);
}
+ | ON CONFLICT LOG_P Iconst
+ {
+ $$ = makeDefElem("on_conflict_log", (Node *)makeInteger($4), @1);
+ }
+ | LOG_P FILE_P NAME_P Sconst
+ {
+ $$ = makeDefElem("log_file_name", (Node *)makeString($4), @1);
+ }
;
/* The following exist for backward compatibility with very old versions */
@@ -15052,6 +15060,7 @@ unreserved_keyword:
| EXTENSION
| EXTERNAL
| FAMILY
+ | FILE_P
| FILTER
| FIRST_P
| FOLLOWING
@@ -15100,6 +15109,7 @@ unreserved_keyword:
| LOCATION
| LOCK_P
| LOCKED
+ | LOG_P
| LOGGED
| MAPPING
| MATCH
diff --git a/src/include/parser/kwlist.h b/src/include/parser/kwlist.h
index 23db40147b..442562b0fe 100644
--- a/src/include/parser/kwlist.h
+++ b/src/include/parser/kwlist.h
@@ -162,6 +162,7 @@ PG_KEYWORD("extract", EXTRACT, COL_NAME_KEYWORD)
PG_KEYWORD("false", FALSE_P, RESERVED_KEYWORD)
PG_KEYWORD("family", FAMILY, UNRESERVED_KEYWORD)
PG_KEYWORD("fetch", FETCH, RESERVED_KEYWORD)
+PG_KEYWORD("file", FILE_P, UNRESERVED_KEYWORD)
PG_KEYWORD("filter", FILTER, UNRESERVED_KEYWORD)
PG_KEYWORD("first", FIRST_P, UNRESERVED_KEYWORD)
PG_KEYWORD("float", FLOAT_P, COL_NAME_KEYWORD)
@@ -242,6 +243,7 @@ PG_KEYWORD("localtimestamp", LOCALTIMESTAMP, RESERVED_KEYWORD)
PG_KEYWORD("location", LOCATION, UNRESERVED_KEYWORD)
PG_KEYWORD("lock", LOCK_P, UNRESERVED_KEYWORD)
PG_KEYWORD("locked", LOCKED, UNRESERVED_KEYWORD)
+PG_KEYWORD("log", LOG_P, UNRESERVED_KEYWORD)
PG_KEYWORD("logged", LOGGED, UNRESERVED_KEYWORD)
PG_KEYWORD("mapping", MAPPING, UNRESERVED_KEYWORD)
PG_KEYWORD("match", MATCH, UNRESERVED_KEYWORD)