From df9e25b8517ab5dc6dd9f73ed7ad91fc20b1f938 Mon Sep 17 00:00:00 2001
From: Zhao Junwang <zhjwpku@gmail.com>
Date: Wed, 6 Dec 2023 19:13:22 +0800
Subject: [PATCH v4] Extract COPY handlers

---
 src/backend/commands/copy.c          |  44 ++++
 src/backend/commands/copyfrom.c      | 275 ++++++++++++---------
 src/backend/commands/copyfromparse.c | 309 ++++++++++++-----------
 src/backend/commands/copyto.c        | 354 +++++++++++++++------------
 src/include/commands/copy.h          |  49 +++-
 5 files changed, 619 insertions(+), 412 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b562..6ae904c1b8 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -427,6 +427,8 @@ ProcessCopyOptions(ParseState *pstate,
 
 	opts_out->file_encoding = -1;
 
+	/* Text is the default format. */
+	opts_out->handler = CopyHandlerOpsText;
 	/* Extract options from the statement node tree */
 	foreach(option, options)
 	{
@@ -442,9 +444,15 @@ ProcessCopyOptions(ParseState *pstate,
 			if (strcmp(fmt, "text") == 0)
 				 /* default format */ ;
 			else if (strcmp(fmt, "csv") == 0)
+			{
 				opts_out->csv_mode = true;
+				opts_out->handler = CopyHandlerOpsCSV;
+			}
 			else if (strcmp(fmt, "binary") == 0)
+			{
 				opts_out->binary = true;
+				opts_out->handler = CopyHandlerOpsBinary;
+			}
 			else
 				ereport(ERROR,
 						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -864,3 +872,39 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
 
 	return attnums;
 }
+
+const CopyHandlerOps CopyHandlerOpsText = {
+	.copy_to_start = CopyToFormatTextStart,
+	.copy_to_one_row = CopyToFormatTextOneRow,
+	.copy_to_end = CopyToFormatTextEnd,
+	.copy_from_start = CopyFromFormatTextStart,
+	.copy_from_next = CopyFromFormatTextNext,
+	.copy_from_error_callback = CopyFromFormatTextErrorCallback,
+	.copy_from_end = NULL,
+};
+
+/*
+ * We can use the same CopyHandlerOps for both of "text" and "csv" because
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their
+ * behavior. We can split the implementations and stop referring
+ * cstate->opts.csv_mode later.
+ */
+const CopyHandlerOps CopyHandlerOpsCSV = {
+	.copy_to_start = CopyToFormatTextStart,
+	.copy_to_one_row = CopyToFormatTextOneRow,
+	.copy_to_end = CopyToFormatTextEnd,
+	.copy_from_start = CopyFromFormatTextStart,
+	.copy_from_next = CopyFromFormatTextNext,
+	.copy_from_error_callback = CopyFromFormatTextErrorCallback,
+	.copy_from_end = NULL,
+};
+
+const CopyHandlerOps CopyHandlerOpsBinary = {
+	.copy_to_start = CopyToFormatBinaryStart,
+	.copy_to_one_row = CopyToFormatBinaryOneRow,
+	.copy_to_end = CopyToFormatBinaryEnd,
+	.copy_from_start = CopyFromFormatBinaryStart,
+	.copy_from_next = CopyFromFormatBinaryNext,
+	.copy_from_error_callback = CopyFromFormatBinaryErrorCallback,
+	.copy_from_end = NULL,
+};
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index f4861652a9..01c3c1c84f 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -107,6 +107,71 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
+void
+CopyFromFormatBinaryErrorCallback(CopyFromState cstate)
+{
+	/* can't usefully display the data */
+	if (cstate->cur_attname)
+		errcontext("COPY %s, line %llu, column %s",
+					cstate->cur_relname,
+					(unsigned long long) cstate->cur_lineno,
+					cstate->cur_attname);
+	else
+		errcontext("COPY %s, line %llu",
+					cstate->cur_relname,
+					(unsigned long long) cstate->cur_lineno);
+}
+
+void
+CopyFromFormatTextErrorCallback(CopyFromState cstate)
+{
+	if (cstate->cur_attname && cstate->cur_attval)
+	{
+		/* error is relevant to a particular column */
+		char	   *attval;
+
+		attval = limit_printout_length(cstate->cur_attval);
+		errcontext("COPY %s, line %llu, column %s: \"%s\"",
+					cstate->cur_relname,
+					(unsigned long long) cstate->cur_lineno,
+					cstate->cur_attname,
+					attval);
+		pfree(attval);
+	}
+	else if (cstate->cur_attname)
+	{
+		/* error is relevant to a particular column, value is NULL */
+		errcontext("COPY %s, line %llu, column %s: null input",
+					cstate->cur_relname,
+					(unsigned long long) cstate->cur_lineno,
+					cstate->cur_attname);
+	}
+	else
+	{
+		/*
+			* Error is relevant to a particular line.
+			*
+			* If line_buf still contains the correct line, print it.
+			*/
+		if (cstate->line_buf_valid)
+		{
+			char	   *lineval;
+
+			lineval = limit_printout_length(cstate->line_buf.data);
+			errcontext("COPY %s, line %llu: \"%s\"",
+						cstate->cur_relname,
+						(unsigned long long) cstate->cur_lineno, lineval);
+			pfree(lineval);
+		}
+		else
+		{
+			errcontext("COPY %s, line %llu",
+						cstate->cur_relname,
+						(unsigned long long) cstate->cur_lineno);
+		}
+	}
+}
+
 /*
  * error context callback for COPY FROM
  *
@@ -123,67 +188,7 @@ CopyFromErrorCallback(void *arg)
 				   cstate->cur_relname);
 		return;
 	}
-	if (cstate->opts.binary)
-	{
-		/* can't usefully display the data */
-		if (cstate->cur_attname)
-			errcontext("COPY %s, line %llu, column %s",
-					   cstate->cur_relname,
-					   (unsigned long long) cstate->cur_lineno,
-					   cstate->cur_attname);
-		else
-			errcontext("COPY %s, line %llu",
-					   cstate->cur_relname,
-					   (unsigned long long) cstate->cur_lineno);
-	}
-	else
-	{
-		if (cstate->cur_attname && cstate->cur_attval)
-		{
-			/* error is relevant to a particular column */
-			char	   *attval;
-
-			attval = limit_printout_length(cstate->cur_attval);
-			errcontext("COPY %s, line %llu, column %s: \"%s\"",
-					   cstate->cur_relname,
-					   (unsigned long long) cstate->cur_lineno,
-					   cstate->cur_attname,
-					   attval);
-			pfree(attval);
-		}
-		else if (cstate->cur_attname)
-		{
-			/* error is relevant to a particular column, value is NULL */
-			errcontext("COPY %s, line %llu, column %s: null input",
-					   cstate->cur_relname,
-					   (unsigned long long) cstate->cur_lineno,
-					   cstate->cur_attname);
-		}
-		else
-		{
-			/*
-			 * Error is relevant to a particular line.
-			 *
-			 * If line_buf still contains the correct line, print it.
-			 */
-			if (cstate->line_buf_valid)
-			{
-				char	   *lineval;
-
-				lineval = limit_printout_length(cstate->line_buf.data);
-				errcontext("COPY %s, line %llu: \"%s\"",
-						   cstate->cur_relname,
-						   (unsigned long long) cstate->cur_lineno, lineval);
-				pfree(lineval);
-			}
-			else
-			{
-				errcontext("COPY %s, line %llu",
-						   cstate->cur_relname,
-						   (unsigned long long) cstate->cur_lineno);
-			}
-		}
-	}
+	cstate->opts.handler.copy_from_error_callback(cstate);
 }
 
 /*
@@ -1320,6 +1325,101 @@ CopyFrom(CopyFromState cstate)
 	return processed;
 }
 
+void
+CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+	FmgrInfo   *in_functions;
+	Oid		   *typioparams;
+	Oid			in_func_oid;
+	AttrNumber	num_phys_attrs;
+
+	/*
+	 * Pick up the required catalog information for each attribute in the
+	 * relation, including the input function, the element type (to pass to
+	 * the input function), and info about defaults and constraints. (Which
+	 * input function we use depends on text/binary format choice.)
+	 */
+	num_phys_attrs = tupDesc->natts;
+	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+
+	for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+
+		/* We don't need info for dropped attributes */
+		if (att->attisdropped)
+			continue;
+
+		/* Fetch the input function and typioparam info */
+		getTypeBinaryInputInfo(att->atttypid,
+							   &in_func_oid, &typioparams[attnum - 1]);
+
+		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+	}
+	cstate->in_functions = in_functions;
+	cstate->typioparams = typioparams;
+}
+
+void
+CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+	FmgrInfo   *in_functions;
+	Oid		   *typioparams;
+	Oid			in_func_oid;
+	AttrNumber	attr_count,
+				num_phys_attrs;
+
+	num_phys_attrs = tupDesc->natts;
+
+	/*
+	 * If encoding conversion is needed, we need another buffer to hold
+	 * the converted input data.  Otherwise, we can just point input_buf
+	 * to the same buffer as raw_buf.
+	 */
+	if (cstate->need_transcoding)
+	{
+		cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+		cstate->input_buf_index = cstate->input_buf_len = 0;
+	}
+	else
+		cstate->input_buf = cstate->raw_buf;
+	cstate->input_reached_eof = false;
+
+	initStringInfo(&cstate->line_buf);
+
+	/* create workspace for CopyReadAttributes results */
+	attr_count = list_length(cstate->attnumlist);
+
+	cstate->max_fields = attr_count;
+	cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+
+	/*
+	 * Pick up the required catalog information for each attribute in the
+	 * relation, including the input function, the element type (to pass to
+	 * the input function), and info about defaults and constraints. (Which
+	 * input function we use depends on text/binary format choice.)
+	 */
+	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+
+	for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+
+		/* We don't need info for dropped attributes */
+		if (att->attisdropped)
+			continue;
+
+		/* Fetch the input function and typioparam info */
+		getTypeInputInfo(att->atttypid,
+						 &in_func_oid, &typioparams[attnum - 1]);
+		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+	}
+	cstate->in_functions = in_functions;
+	cstate->typioparams = typioparams;
+}
+
 /*
  * Setup to read tuples from a file for COPY FROM.
  *
@@ -1348,9 +1448,6 @@ BeginCopyFrom(ParseState *pstate,
 	TupleDesc	tupDesc;
 	AttrNumber	num_phys_attrs,
 				num_defaults;
-	FmgrInfo   *in_functions;
-	Oid		   *typioparams;
-	Oid			in_func_oid;
 	int		   *defmap;
 	ExprState **defexprs;
 	MemoryContext oldcontext;
@@ -1518,25 +1615,6 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
 	cstate->raw_reached_eof = false;
 
-	if (!cstate->opts.binary)
-	{
-		/*
-		 * If encoding conversion is needed, we need another buffer to hold
-		 * the converted input data.  Otherwise, we can just point input_buf
-		 * to the same buffer as raw_buf.
-		 */
-		if (cstate->need_transcoding)
-		{
-			cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-			cstate->input_buf_index = cstate->input_buf_len = 0;
-		}
-		else
-			cstate->input_buf = cstate->raw_buf;
-		cstate->input_reached_eof = false;
-
-		initStringInfo(&cstate->line_buf);
-	}
-
 	initStringInfo(&cstate->attribute_buf);
 
 	/* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1546,17 +1624,10 @@ BeginCopyFrom(ParseState *pstate,
 		cstate->rteperminfos = pstate->p_rteperminfos;
 	}
 
+	cstate->opts.handler.copy_from_start(cstate, tupDesc);
+
 	num_defaults = 0;
 	volatile_defexprs = false;
-
-	/*
-	 * Pick up the required catalog information for each attribute in the
-	 * relation, including the input function, the element type (to pass to
-	 * the input function), and info about defaults and constraints. (Which
-	 * input function we use depends on text/binary format choice.)
-	 */
-	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
 
@@ -1568,15 +1639,6 @@ BeginCopyFrom(ParseState *pstate,
 		if (att->attisdropped)
 			continue;
 
-		/* Fetch the input function and typioparam info */
-		if (cstate->opts.binary)
-			getTypeBinaryInputInfo(att->atttypid,
-								   &in_func_oid, &typioparams[attnum - 1]);
-		else
-			getTypeInputInfo(att->atttypid,
-							 &in_func_oid, &typioparams[attnum - 1]);
-		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-
 		/* Get default info if available */
 		defexprs[attnum - 1] = NULL;
 
@@ -1636,8 +1698,6 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->bytes_processed = 0;
 
 	/* We keep those variables in cstate. */
-	cstate->in_functions = in_functions;
-	cstate->typioparams = typioparams;
 	cstate->defmap = defmap;
 	cstate->defexprs = defexprs;
 	cstate->volatile_defexprs = volatile_defexprs;
@@ -1716,15 +1776,6 @@ BeginCopyFrom(ParseState *pstate,
 		ReceiveCopyBinaryHeader(cstate);
 	}
 
-	/* create workspace for CopyReadAttributes results */
-	if (!cstate->opts.binary)
-	{
-		AttrNumber	attr_count = list_length(cstate->attnumlist);
-
-		cstate->max_fields = attr_count;
-		cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-	}
-
 	MemoryContextSwitchTo(oldcontext);
 
 	return cstate;
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index f553734582..e840ebb108 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -839,187 +839,208 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
 	return true;
 }
 
-/*
- * Read next tuple from file for COPY FROM. Return false if no more tuples.
- *
- * 'econtext' is used to evaluate default expression for each column that is
- * either not read from the file or is using the DEFAULT option of COPY FROM.
- * It can be NULL when no default values are used, i.e. when all columns are
- * read from the file, and DEFAULT option is unset.
- *
- * 'values' and 'nulls' arrays must be the same length as columns of the
- * relation passed to BeginCopyFrom. This function fills the arrays.
- */
 bool
-NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
-			 Datum *values, bool *nulls)
+CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext,
+						 Datum *values, bool *nulls)
 {
 	TupleDesc	tupDesc;
-	AttrNumber	num_phys_attrs,
-				attr_count,
-				num_defaults = cstate->num_defaults;
+	AttrNumber	attr_count;
+	int16		fld_count;
+	ListCell   *cur;
 	FmgrInfo   *in_functions = cstate->in_functions;
 	Oid		   *typioparams = cstate->typioparams;
-	int			i;
-	int		   *defmap = cstate->defmap;
-	ExprState **defexprs = cstate->defexprs;
 
-	tupDesc = RelationGetDescr(cstate->rel);
-	num_phys_attrs = tupDesc->natts;
 	attr_count = list_length(cstate->attnumlist);
 
-	/* Initialize all values for row to NULL */
-	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
-	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-	MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
+	cstate->cur_lineno++;
 
-	if (!cstate->opts.binary)
+	if (!CopyGetInt16(cstate, &fld_count))
 	{
-		char	  **field_strings;
-		ListCell   *cur;
-		int			fldct;
-		int			fieldno;
-		char	   *string;
+		/* EOF detected (end of file, or protocol-level EOF) */
+		return false;
+	}
 
-		/* read raw fields in the next line */
-		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-			return false;
+	if (fld_count == -1)
+	{
+		/*
+		 * Received EOF marker.  Wait for the protocol-level EOF, and
+		 * complain if it doesn't come immediately.  In COPY FROM STDIN,
+		 * this ensures that we correctly handle CopyFail, if client
+		 * chooses to send that now.  When copying from file, we could
+		 * ignore the rest of the file like in text mode, but we choose to
+		 * be consistent with the COPY FROM STDIN case.
+		 */
+		char		dummy;
 
-		/* check for overflowing fields */
-		if (attr_count > 0 && fldct > attr_count)
+		if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
 			ereport(ERROR,
 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("extra data after last expected column")));
+						errmsg("received copy data after EOF marker")));
+		return false;
+	}
 
-		fieldno = 0;
+	if (fld_count != attr_count)
+		ereport(ERROR,
+				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+					errmsg("row field count is %d, expected %d",
+						(int) fld_count, attr_count)));
 
-		/* Loop to read the user attributes on the line. */
-		foreach(cur, cstate->attnumlist)
-		{
-			int			attnum = lfirst_int(cur);
-			int			m = attnum - 1;
-			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+	tupDesc = RelationGetDescr(cstate->rel);
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		int			m = attnum - 1;
+		Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+		cstate->cur_attname = NameStr(att->attname);
+		values[m] = CopyReadBinaryAttribute(cstate,
+											&in_functions[m],
+											typioparams[m],
+											att->atttypmod,
+											&nulls[m]);
+		cstate->cur_attname = NULL;
+	}
 
-			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for column \"%s\"",
-								NameStr(att->attname))));
-			string = field_strings[fieldno++];
+	return true;
+}
 
-			if (cstate->convert_select_flags &&
-				!cstate->convert_select_flags[m])
-			{
-				/* ignore input field, leaving column as NULL */
-				continue;
-			}
+bool
+CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext,
+					   Datum *values, bool *nulls)
+{
+	TupleDesc	tupDesc;
+	AttrNumber	attr_count;
+	FmgrInfo   *in_functions = cstate->in_functions;
+	Oid		   *typioparams = cstate->typioparams;
+	ExprState **defexprs = cstate->defexprs;
+	char	  **field_strings;
+	ListCell   *cur;
+	int			fldct;
+	int			fieldno = 0;
+	char	   *string;
 
-			if (cstate->opts.csv_mode)
-			{
-				if (string == NULL &&
-					cstate->opts.force_notnull_flags[m])
-				{
-					/*
-					 * FORCE_NOT_NULL option is set and column is NULL -
-					 * convert it to the NULL string.
-					 */
-					string = cstate->opts.null_print;
-				}
-				else if (string != NULL && cstate->opts.force_null_flags[m]
-						 && strcmp(string, cstate->opts.null_print) == 0)
-				{
-					/*
-					 * FORCE_NULL option is set and column matches the NULL
-					 * string. It must have been quoted, or otherwise the
-					 * string would already have been set to NULL. Convert it
-					 * to NULL as specified.
-					 */
-					string = NULL;
-				}
-			}
+	attr_count = list_length(cstate->attnumlist);
+
+	/* read raw fields in the next line */
+	if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
+		return false;
+
+	/* check for overflowing fields */
+	if (attr_count > 0 && fldct > attr_count)
+		ereport(ERROR,
+				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+					errmsg("extra data after last expected column")));
 
-			cstate->cur_attname = NameStr(att->attname);
-			cstate->cur_attval = string;
+	tupDesc = RelationGetDescr(cstate->rel);
+	/* Loop to read the user attributes on the line. */
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		int			m = attnum - 1;
+		Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
-			if (string != NULL)
-				nulls[m] = false;
+		if (fieldno >= fldct)
+			ereport(ERROR,
+					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						errmsg("missing data for column \"%s\"",
+							NameStr(att->attname))));
+		string = field_strings[fieldno++];
 
-			if (cstate->defaults[m])
+		if (cstate->convert_select_flags &&
+			!cstate->convert_select_flags[m])
+		{
+			/* ignore input field, leaving column as NULL */
+			continue;
+		}
+
+		if (cstate->opts.csv_mode)
+		{
+			if (string == NULL &&
+				cstate->opts.force_notnull_flags[m])
 			{
 				/*
-				 * The caller must supply econtext and have switched into the
-				 * per-tuple memory context in it.
+				 * FORCE_NOT_NULL option is set and column is NULL -
+				 * convert it to the NULL string.
 				 */
-				Assert(econtext != NULL);
-				Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
-				values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+				string = cstate->opts.null_print;
+			}
+			else if (string != NULL && cstate->opts.force_null_flags[m]
+						&& strcmp(string, cstate->opts.null_print) == 0)
+			{
+				/*
+				 * FORCE_NULL option is set and column matches the NULL
+				 * string. It must have been quoted, or otherwise the
+				 * string would already have been set to NULL. Convert it
+				 * to NULL as specified.
+				 */
+				string = NULL;
 			}
-			else
-				values[m] = InputFunctionCall(&in_functions[m],
-											  string,
-											  typioparams[m],
-											  att->atttypmod);
-
-			cstate->cur_attname = NULL;
-			cstate->cur_attval = NULL;
 		}
 
-		Assert(fieldno == attr_count);
-	}
-	else
-	{
-		/* binary */
-		int16		fld_count;
-		ListCell   *cur;
+		cstate->cur_attname = NameStr(att->attname);
+		cstate->cur_attval = string;
 
-		cstate->cur_lineno++;
+		if (string != NULL)
+			nulls[m] = false;
 
-		if (!CopyGetInt16(cstate, &fld_count))
-		{
-			/* EOF detected (end of file, or protocol-level EOF) */
-			return false;
-		}
-
-		if (fld_count == -1)
+		if (cstate->defaults[m])
 		{
 			/*
-			 * Received EOF marker.  Wait for the protocol-level EOF, and
-			 * complain if it doesn't come immediately.  In COPY FROM STDIN,
-			 * this ensures that we correctly handle CopyFail, if client
-			 * chooses to send that now.  When copying from file, we could
-			 * ignore the rest of the file like in text mode, but we choose to
-			 * be consistent with the COPY FROM STDIN case.
+			 * The caller must supply econtext and have switched into the
+			 * per-tuple memory context in it.
 			 */
-			char		dummy;
+			Assert(econtext != NULL);
+			Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
-			if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("received copy data after EOF marker")));
-			return false;
+			values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
 		}
+		else
+			values[m] = InputFunctionCall(&in_functions[m],
+											string,
+											typioparams[m],
+											att->atttypmod);
 
-		if (fld_count != attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("row field count is %d, expected %d",
-							(int) fld_count, attr_count)));
+		cstate->cur_attname = NULL;
+		cstate->cur_attval = NULL;
+	}
 
-		foreach(cur, cstate->attnumlist)
-		{
-			int			attnum = lfirst_int(cur);
-			int			m = attnum - 1;
-			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-			cstate->cur_attname = NameStr(att->attname);
-			values[m] = CopyReadBinaryAttribute(cstate,
-												&in_functions[m],
-												typioparams[m],
-												att->atttypmod,
-												&nulls[m]);
-			cstate->cur_attname = NULL;
-		}
+	Assert(fieldno == attr_count);
+	return true;
+}
+
+/*
+ * Read next tuple from file for COPY FROM. Return false if no more tuples.
+ *
+ * 'econtext' is used to evaluate default expression for each column that is
+ * either not read from the file or is using the DEFAULT option of COPY FROM.
+ * It can be NULL when no default values are used, i.e. when all columns are
+ * read from the file, and DEFAULT option is unset.
+ *
+ * 'values' and 'nulls' arrays must be the same length as columns of the
+ * relation passed to BeginCopyFrom. This function fills the arrays.
+ */
+bool
+NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
+			 Datum *values, bool *nulls)
+{
+	TupleDesc	tupDesc;
+	AttrNumber	num_phys_attrs,
+				num_defaults = cstate->num_defaults;
+	int			i;
+	int		   *defmap = cstate->defmap;
+	ExprState **defexprs = cstate->defexprs;
+
+	tupDesc = RelationGetDescr(cstate->rel);
+	num_phys_attrs = tupDesc->natts;
+
+	/* Initialize all values for row to NULL */
+	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
+	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
+	MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
+
+	if (!cstate->opts.handler.copy_from_next(cstate, econtext, values, nulls))
+	{
+		return false;
 	}
 
 	/*
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c66a047c4a..4538bc6292 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -131,6 +131,205 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * CopyHandlerOps implementation of COPY TO for "text" and "csv".
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their behavior.
+ * We can split this implementation and stop referring cstate->opts.csv_mode
+ * later.
+ */
+
+static void
+CopyToFormatTextSendEndOfRow(CopyToState cstate)
+{
+	switch (cstate->copy_dest)
+	{
+	case COPY_FILE:
+		/* Default line termination depends on platform */
+#ifndef WIN32
+		CopySendChar(cstate, '\n');
+#else
+		CopySendString(cstate, "\r\n");
+#endif
+		break;
+	case COPY_FRONTEND:
+		/* The FE/BE protocol uses \n as newline for all platforms */
+		CopySendChar(cstate, '\n');
+		break;
+	default:
+		break;
+	}
+	CopySendEndOfRow(cstate);
+}
+
+void
+CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+	int			num_phys_attrs;
+	ListCell   *cur;
+
+	num_phys_attrs = tupDesc->natts;
+	/* Get info about the columns we need to process. */
+	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		Oid			out_func_oid;
+		bool		isvarlena;
+		Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+		getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+		fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+	}
+
+	/*
+	 * For non-binary copy, we need to convert null_print to file
+	 * encoding, because it will be sent directly with CopySendString.
+	 */
+	if (cstate->need_transcoding)
+		cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+														  cstate->opts.null_print_len,
+														  cstate->file_encoding);
+
+	/* if a header has been requested send the line */
+	if (cstate->opts.header_line)
+	{
+		bool		hdr_delim = false;
+
+		foreach(cur, cstate->attnumlist)
+		{
+			int			attnum = lfirst_int(cur);
+			char	   *colname;
+
+			if (hdr_delim)
+				CopySendChar(cstate, cstate->opts.delim[0]);
+			hdr_delim = true;
+
+			colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+			if (cstate->opts.csv_mode)
+				CopyAttributeOutCSV(cstate, colname, false,
+									list_length(cstate->attnumlist) == 1);
+			else
+				CopyAttributeOutText(cstate, colname);
+		}
+
+		CopyToFormatTextSendEndOfRow(cstate);
+	}
+}
+
+void
+CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+	bool		need_delim = false;
+	FmgrInfo   *out_functions = cstate->out_functions;
+	ListCell   *cur;
+
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		Datum		value = slot->tts_values[attnum - 1];
+		bool		isnull = slot->tts_isnull[attnum - 1];
+
+		if (need_delim)
+			CopySendChar(cstate, cstate->opts.delim[0]);
+		need_delim = true;
+
+		if (isnull)
+			CopySendString(cstate, cstate->opts.null_print_client);
+		else
+		{
+			char	   *string;
+
+			string = OutputFunctionCall(&out_functions[attnum - 1], value);
+			if (cstate->opts.csv_mode)
+				CopyAttributeOutCSV(cstate, string,
+									cstate->opts.force_quote_flags[attnum - 1],
+									list_length(cstate->attnumlist) == 1);
+			else
+				CopyAttributeOutText(cstate, string);
+		}
+	}
+
+	CopyToFormatTextSendEndOfRow(cstate);
+}
+
+void
+CopyToFormatTextEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyHandlerOps implementation for "binary" COPY TO.
+ */
+
+void
+CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+	int			num_phys_attrs;
+	ListCell   *cur;
+
+	num_phys_attrs = tupDesc->natts;
+	/* Get info about the columns we need to process. */
+	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		Oid			out_func_oid;
+		bool		isvarlena;
+		Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+		getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+		fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+	}
+
+	/* Generate header for a binary copy */
+	/* Signature */
+	CopySendData(cstate, BinarySignature, 11);
+	/* Flags field */
+	CopySendInt32(cstate, 0);
+	/* No header extension */
+	CopySendInt32(cstate, 0);
+}
+
+void
+CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+	FmgrInfo   *out_functions = cstate->out_functions;
+	ListCell   *cur;
+
+	/* Binary per-tuple header */
+	CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		Datum		value = slot->tts_values[attnum - 1];
+		bool		isnull = slot->tts_isnull[attnum - 1];
+
+		if (isnull)
+			CopySendInt32(cstate, -1);
+		else
+		{
+			bytea	   *outputbytes;
+
+			outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+			CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+			CopySendData(cstate, VARDATA(outputbytes),
+						 VARSIZE(outputbytes) - VARHDRSZ);
+		}
+	}
+
+	CopySendEndOfRow(cstate);
+}
+
+void
+CopyToFormatBinaryEnd(CopyToState cstate)
+{
+	/* Generate trailer for a binary copy */
+	CopySendInt16(cstate, -1);
+	/* Need to flush out the trailer */
+	CopySendEndOfRow(cstate);
+}
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -198,16 +397,6 @@ CopySendEndOfRow(CopyToState cstate)
 	switch (cstate->copy_dest)
 	{
 		case COPY_FILE:
-			if (!cstate->opts.binary)
-			{
-				/* Default line termination depends on platform */
-#ifndef WIN32
-				CopySendChar(cstate, '\n');
-#else
-				CopySendString(cstate, "\r\n");
-#endif
-			}
-
 			if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
 					   cstate->copy_file) != 1 ||
 				ferror(cstate->copy_file))
@@ -242,10 +431,6 @@ CopySendEndOfRow(CopyToState cstate)
 			}
 			break;
 		case COPY_FRONTEND:
-			/* The FE/BE protocol uses \n as newline for all platforms */
-			if (!cstate->opts.binary)
-				CopySendChar(cstate, '\n');
-
 			/* Dump the accumulated row as one CopyData message */
 			(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
 			break;
@@ -748,8 +933,6 @@ DoCopyTo(CopyToState cstate)
 	bool		pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
 	TupleDesc	tupDesc;
-	int			num_phys_attrs;
-	ListCell   *cur;
 	uint64		processed;
 
 	if (fe_copy)
@@ -759,32 +942,11 @@ DoCopyTo(CopyToState cstate)
 		tupDesc = RelationGetDescr(cstate->rel);
 	else
 		tupDesc = cstate->queryDesc->tupDesc;
-	num_phys_attrs = tupDesc->natts;
 	cstate->opts.null_print_client = cstate->opts.null_print;	/* default */
 
 	/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
 	cstate->fe_msgbuf = makeStringInfo();
 
-	/* Get info about the columns we need to process. */
-	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-	foreach(cur, cstate->attnumlist)
-	{
-		int			attnum = lfirst_int(cur);
-		Oid			out_func_oid;
-		bool		isvarlena;
-		Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
-		if (cstate->opts.binary)
-			getTypeBinaryOutputInfo(attr->atttypid,
-									&out_func_oid,
-									&isvarlena);
-		else
-			getTypeOutputInfo(attr->atttypid,
-							  &out_func_oid,
-							  &isvarlena);
-		fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-	}
-
 	/*
 	 * Create a temporary memory context that we can reset once per row to
 	 * recover palloc'd memory.  This avoids any problems with leaks inside
@@ -795,57 +957,7 @@ DoCopyTo(CopyToState cstate)
 											   "COPY TO",
 											   ALLOCSET_DEFAULT_SIZES);
 
-	if (cstate->opts.binary)
-	{
-		/* Generate header for a binary copy */
-		int32		tmp;
-
-		/* Signature */
-		CopySendData(cstate, BinarySignature, 11);
-		/* Flags field */
-		tmp = 0;
-		CopySendInt32(cstate, tmp);
-		/* No header extension */
-		tmp = 0;
-		CopySendInt32(cstate, tmp);
-	}
-	else
-	{
-		/*
-		 * For non-binary copy, we need to convert null_print to file
-		 * encoding, because it will be sent directly with CopySendString.
-		 */
-		if (cstate->need_transcoding)
-			cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-															  cstate->opts.null_print_len,
-															  cstate->file_encoding);
-
-		/* if a header has been requested send the line */
-		if (cstate->opts.header_line)
-		{
-			bool		hdr_delim = false;
-
-			foreach(cur, cstate->attnumlist)
-			{
-				int			attnum = lfirst_int(cur);
-				char	   *colname;
-
-				if (hdr_delim)
-					CopySendChar(cstate, cstate->opts.delim[0]);
-				hdr_delim = true;
-
-				colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-				if (cstate->opts.csv_mode)
-					CopyAttributeOutCSV(cstate, colname, false,
-										list_length(cstate->attnumlist) == 1);
-				else
-					CopyAttributeOutText(cstate, colname);
-			}
-
-			CopySendEndOfRow(cstate);
-		}
-	}
+	cstate->opts.handler.copy_to_start(cstate, tupDesc);
 
 	if (cstate->rel)
 	{
@@ -884,13 +996,7 @@ DoCopyTo(CopyToState cstate)
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
 
-	if (cstate->opts.binary)
-	{
-		/* Generate trailer for a binary copy */
-		CopySendInt16(cstate, -1);
-		/* Need to flush out the trailer */
-		CopySendEndOfRow(cstate);
-	}
+	cstate->opts.handler.copy_to_end(cstate);
 
 	MemoryContextDelete(cstate->rowcontext);
 
@@ -906,71 +1012,15 @@ DoCopyTo(CopyToState cstate)
 static void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-	bool		need_delim = false;
-	FmgrInfo   *out_functions = cstate->out_functions;
 	MemoryContext oldcontext;
-	ListCell   *cur;
-	char	   *string;
 
 	MemoryContextReset(cstate->rowcontext);
 	oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
-	if (cstate->opts.binary)
-	{
-		/* Binary per-tuple header */
-		CopySendInt16(cstate, list_length(cstate->attnumlist));
-	}
-
 	/* Make sure the tuple is fully deconstructed */
 	slot_getallattrs(slot);
 
-	foreach(cur, cstate->attnumlist)
-	{
-		int			attnum = lfirst_int(cur);
-		Datum		value = slot->tts_values[attnum - 1];
-		bool		isnull = slot->tts_isnull[attnum - 1];
-
-		if (!cstate->opts.binary)
-		{
-			if (need_delim)
-				CopySendChar(cstate, cstate->opts.delim[0]);
-			need_delim = true;
-		}
-
-		if (isnull)
-		{
-			if (!cstate->opts.binary)
-				CopySendString(cstate, cstate->opts.null_print_client);
-			else
-				CopySendInt32(cstate, -1);
-		}
-		else
-		{
-			if (!cstate->opts.binary)
-			{
-				string = OutputFunctionCall(&out_functions[attnum - 1],
-											value);
-				if (cstate->opts.csv_mode)
-					CopyAttributeOutCSV(cstate, string,
-										cstate->opts.force_quote_flags[attnum - 1],
-										list_length(cstate->attnumlist) == 1);
-				else
-					CopyAttributeOutText(cstate, string);
-			}
-			else
-			{
-				bytea	   *outputbytes;
-
-				outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-											   value);
-				CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-				CopySendData(cstate, VARDATA(outputbytes),
-							 VARSIZE(outputbytes) - VARHDRSZ);
-			}
-		}
-	}
-
-	CopySendEndOfRow(cstate);
+	cstate->opts.handler.copy_to_one_row(cstate, slot);
 
 	MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b90b..5b3ffcd190 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -30,6 +30,34 @@ typedef enum CopyHeaderChoice
 	COPY_HEADER_MATCH,
 } CopyHeaderChoice;
 
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY HANDLER implementation. */
+typedef struct CopyHandlerOps
+{
+	/* Called when COPY TO is started. This will send a header. */
+	void		(*copy_to_start) (CopyToState cstate, TupleDesc tupDesc);
+
+	/* Copy one row for COPY TO. */
+	void		(*copy_to_one_row) (CopyToState cstate, TupleTableSlot *slot);
+
+	/* Called when COPY TO is ended. This will send a trailer. */
+	void		(*copy_to_end) (CopyToState cstate);
+
+	void		(*copy_from_start) (CopyFromState cstate, TupleDesc tupDesc);
+	bool		(*copy_from_next) (CopyFromState cstate, ExprContext *econtext,
+			 					   Datum *values, bool *nulls);
+	void		(*copy_from_error_callback) (CopyFromState cstate);
+	void		(*copy_from_end) (CopyFromState cstate);
+} CopyHandlerOps;
+
+/* Predefined CopyToFormatOps for "text", "csv" and "binary". */
+extern PGDLLIMPORT const CopyHandlerOps CopyHandlerOpsText;
+extern PGDLLIMPORT const CopyHandlerOps CopyHandlerOpsCSV;
+extern PGDLLIMPORT const CopyHandlerOps CopyHandlerOpsBinary;
+
 /*
  * A struct to hold COPY options, in a parsed form. All of these are related
  * to formatting, except for 'freeze', which doesn't really belong here, but
@@ -63,12 +91,9 @@ typedef struct CopyFormatOptions
 	bool	   *force_null_flags;	/* per-column CSV FN flags */
 	bool		convert_selectively;	/* do selective binary conversion? */
 	List	   *convert_select; /* list of column names (can be NIL) */
+	CopyHandlerOps handler;		/* copy handler operations */
 } CopyFormatOptions;
 
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 typedef void (*copy_data_dest_cb) (void *data, int len);
 
@@ -102,4 +127,20 @@ extern uint64 DoCopyTo(CopyToState cstate);
 extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
 							List *attnamelist);
 
+extern void CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatTextEnd(CopyToState cstate);
+extern void CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext,
+								   Datum *values, bool *nulls);
+extern void CopyFromFormatTextErrorCallback(CopyFromState cstate);
+
+extern void CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatBinaryEnd(CopyToState cstate);
+extern void CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext,
+									 Datum *values, bool *nulls);
+extern void CopyFromFormatBinaryErrorCallback(CopyFromState cstate);
+
 #endif							/* COPY_H */
-- 
2.41.0

