Hi,

With the current file_fdw, if even one line of data conversion fails, the contents of the file cannot be referenced at all:

  =# \! cat data/test.data
  1,a
  2,b
  a,c
=# create foreign table f_fdw_test_1 (i int, t text) server f_fdw options (filename 'test.data', format 'csv');
  CREATE FOREIGN TABLE

  =# table f_fdw_test_1;
  ERROR:  invalid input syntax for type integer: "a"
  CONTEXT:  COPY f_fdw_test, line 3, column i: "a"

Since we'll support ON_ERROR option which tolerates data conversion errors in COPY FROM and LOG_VERBOSITY option at v17[1], how about supporting them on file_fdw?

This idea comes from Fujii-san[2], and I think it'd be useful when reading a bit dirty data.

Attached PoC patch works like below:

=# create foreign table f_fdw_test_2 (i int, t text) server f_fdw options (filename 'test.data', format 'csv', on_error 'ignore');
  CREATE FOREIGN TABLE

  =# table f_fdw_test_2;
  NOTICE:  1 row was skipped due to data type incompatibility
   i | t
  ---+---
   1 | a
   2 | b
  (2 rows)


=# create foreign table f_fdw_test_3 (i int, t text) server f_fdw options (filename 'test.data', format 'csv', on_error 'ignore', log_verbosity 'verbose');
CREATE FOREIGN TABLE

  =# table f_fdw_test_3 ;
NOTICE: skipping row due to data type incompatibility at line 3 for column i: "a"
  NOTICE:  1 row was skipped due to data type incompatibility
   i | t
  ---+---
   1 | a
   2 | b
  (2 rows)


I'm going to continue developing the patch(e.g. add doc, measure performance degradation) when people also think this feature is worth adding.


What do you think?


[1] https://www.postgresql.org/docs/devel/sql-copy.html
[2] https://x.com/fujii_masao/status/1808178032219509041

--
Regards,

--
Atsushi Torikoshi
NTT DATA Group Corporation
From b6ec598bfdd64833e0bffc889a11addc5d677b51 Mon Sep 17 00:00:00 2001
From: Atsushi Torikoshi <torikos...@oss.nttdata.com>
Date: Fri, 5 Jul 2024 00:07:26 +0900
Subject: [PATCH v1] PoC patch for adding on_error and log_verbosity options to
 file_fdw

---
 contrib/file_fdw/expected/file_fdw.out | 20 +++++++++
 contrib/file_fdw/file_fdw.c            | 58 +++++++++++++++++++++++---
 contrib/file_fdw/sql/file_fdw.sql      |  6 +++
 3 files changed, 78 insertions(+), 6 deletions(-)

diff --git a/contrib/file_fdw/expected/file_fdw.out b/contrib/file_fdw/expected/file_fdw.out
index 86c148a86b..1af79af20f 100644
--- a/contrib/file_fdw/expected/file_fdw.out
+++ b/contrib/file_fdw/expected/file_fdw.out
@@ -206,6 +206,26 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
 SELECT * FROM agg_bad;               -- ERROR
 ERROR:  invalid input syntax for type real: "aaa"
 CONTEXT:  COPY agg_bad, line 3, column b: "aaa"
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+NOTICE:  1 row was skipped due to data type incompatibility
+  a  |   b    
+-----+--------
+ 100 | 99.097
+  42 | 324.78
+(2 rows)
+
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'verbose');
+SELECT * FROM agg_bad;
+NOTICE:  skipping row due to data type incompatibility at line 3 for column b: "aaa"
+NOTICE:  1 row was skipped due to data type incompatibility
+  a  |   b    
+-----+--------
+ 100 | 99.097
+  42 | 324.78
+(2 rows)
+
 -- misc query tests
 \t on
 SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');
diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 249d82d3a0..86fb655df1 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -22,8 +22,10 @@
 #include "catalog/pg_authid.h"
 #include "catalog/pg_foreign_table.h"
 #include "commands/copy.h"
+#include "commands/copyfrom_internal.h"
 #include "commands/defrem.h"
 #include "commands/explain.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "foreign/fdwapi.h"
 #include "foreign/foreign.h"
@@ -34,6 +36,7 @@
 #include "optimizer/planmain.h"
 #include "optimizer/restrictinfo.h"
 #include "utils/acl.h"
+#include "utils/backend_progress.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 #include "utils/sampling.h"
@@ -74,6 +77,8 @@ static const struct FileFdwOption valid_options[] = {
 	{"null", ForeignTableRelationId},
 	{"default", ForeignTableRelationId},
 	{"encoding", ForeignTableRelationId},
+	{"on_error", ForeignTableRelationId},
+	{"log_verbosity", ForeignTableRelationId},
 	{"force_not_null", AttributeRelationId},
 	{"force_null", AttributeRelationId},
 
@@ -724,12 +729,13 @@ fileIterateForeignScan(ForeignScanState *node)
 	ExprContext *econtext;
 	MemoryContext oldcontext;
 	TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
-	bool		found;
+	CopyFromState cstate = festate->cstate;
+	int64	skipped = 0;
 	ErrorContextCallback errcallback;
 
 	/* Set up callback to identify error line number. */
 	errcallback.callback = CopyFromErrorCallback;
-	errcallback.arg = (void *) festate->cstate;
+	errcallback.arg = (void *) cstate;
 	errcallback.previous = error_context_stack;
 	error_context_stack = &errcallback;
 
@@ -750,10 +756,40 @@ fileIterateForeignScan(ForeignScanState *node)
 	 * switch in case we are doing that.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-	found = NextCopyFrom(festate->cstate, econtext,
-						 slot->tts_values, slot->tts_isnull);
-	if (found)
+
+	for(;;)
+	{
+		if (!NextCopyFrom(cstate, econtext,
+					 slot->tts_values, slot->tts_isnull))
+			break;
+
+		if (cstate->opts.on_error != COPY_ON_ERROR_STOP &&
+			cstate->escontext->error_occurred)
+		{
+			/*
+			 * Soft error occurred, skip this tuple and deal with error
+			 * information according to ON_ERROR.
+			 */
+			if (cstate->opts.on_error == COPY_ON_ERROR_IGNORE)
+
+				/*
+				 * Just make ErrorSaveContext ready for the next NextCopyFrom.
+				 * Since we don't set details_wanted and error_data is not to
+				 * be filled, just resetting error_occurred is enough.
+				 */
+				cstate->escontext->error_occurred = false;
+
+			/* Report that this tuple was skipped by the ON_ERROR clause */
+			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_SKIPPED,
+										 ++skipped);
+
+			/* Repeat NextCopyFrom() until no soft error occurs */
+			continue;
+		}
+
 		ExecStoreVirtualTuple(slot);
+		break;
+	}
 
 	/* Switch back to original memory context */
 	MemoryContextSwitchTo(oldcontext);
@@ -795,7 +831,17 @@ fileEndForeignScan(ForeignScanState *node)
 	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
 
 	/* if festate is NULL, we are in EXPLAIN; nothing to do */
-	if (festate)
+	if (!festate)
+		return;
+
+	if (festate->cstate->opts.on_error != COPY_ON_ERROR_STOP &&
+		festate->cstate->num_errors > 0)
+		ereport(NOTICE,
+				errmsg_plural("%llu row was skipped due to data type incompatibility",
+							  "%llu rows were skipped due to data type incompatibility",
+							  (unsigned long long) festate->cstate->num_errors,
+							  (unsigned long long) festate->cstate->num_errors));
+
 		EndCopyFrom(festate->cstate);
 }
 
diff --git a/contrib/file_fdw/sql/file_fdw.sql b/contrib/file_fdw/sql/file_fdw.sql
index f0548e14e1..5eae01d0f2 100644
--- a/contrib/file_fdw/sql/file_fdw.sql
+++ b/contrib/file_fdw/sql/file_fdw.sql
@@ -150,6 +150,12 @@ SELECT * FROM agg_csv c JOIN agg_text t ON (t.a = c.a) ORDER BY c.a;
 -- error context report tests
 SELECT * FROM agg_bad;               -- ERROR
 
+-- on_error and log_verbosity tests
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD on_error 'ignore');
+SELECT * FROM agg_bad;
+ALTER FOREIGN TABLE agg_bad OPTIONS (ADD log_verbosity 'verbose');
+SELECT * FROM agg_bad;
+
 -- misc query tests
 \t on
 SELECT explain_filter('EXPLAIN (VERBOSE, COSTS FALSE) SELECT * FROM agg_csv');

base-commit: 2ef575c7803a55101352c0f6cb8f745af063a66c
-- 
2.39.2

Reply via email to