From 30a383c048d948c370e26149820c4bc5ea004d94 Mon Sep 17 00:00:00 2001
From: Zhao Junwang <zhjwpku@gmail.com>
Date: Wed, 6 Dec 2023 19:13:22 +0800
Subject: [PATCH v5] Extract COPY handlers

---
 src/backend/catalog/Makefile            |   4 +-
 src/backend/commands/copy.c             | 159 ++++++++++-
 src/backend/commands/copyfrom.c         | 269 ++++++++++--------
 src/backend/commands/copyfromparse.c    | 309 +++++++++++----------
 src/backend/commands/copyto.c           | 354 ++++++++++++++----------
 src/backend/nodes/Makefile              |   1 +
 src/backend/nodes/gen_node_support.pl   |   2 +
 src/backend/utils/adt/pseudotypes.c     |   1 +
 src/include/catalog/meson.build         |   2 +
 src/include/catalog/pg_copy_handler.dat |  25 ++
 src/include/catalog/pg_copy_handler.h   |  50 ++++
 src/include/catalog/pg_proc.dat         |  21 ++
 src/include/catalog/pg_type.dat         |   7 +
 src/include/commands/copy.h             |  51 +++-
 src/test/regress/expected/oidjoins.out  |   1 +
 15 files changed, 839 insertions(+), 417 deletions(-)
 create mode 100644 src/include/catalog/pg_copy_handler.dat
 create mode 100644 src/include/catalog/pg_copy_handler.h

diff --git a/src/backend/catalog/Makefile b/src/backend/catalog/Makefile
index ec7b6f5362..b0c4cdcdf2 100644
--- a/src/backend/catalog/Makefile
+++ b/src/backend/catalog/Makefile
@@ -118,7 +118,8 @@ CATALOG_HEADERS := \
 	pg_publication_namespace.h \
 	pg_publication_rel.h \
 	pg_subscription.h \
-	pg_subscription_rel.h
+	pg_subscription_rel.h \
+	pg_copy_handler.h \
 
 GENERATED_HEADERS := $(CATALOG_HEADERS:%.h=%_d.h) schemapg.h system_fk_info.h
 
@@ -150,6 +151,7 @@ POSTGRES_BKI_DATA = $(addprefix $(top_srcdir)/src/include/catalog/,\
 	pg_ts_parser.dat \
 	pg_ts_template.dat \
 	pg_type.dat \
+	pg_copy_handler.dat \
 	)
 
 all: generated-header-symlinks
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index cfad47b562..d5c1cb50a3 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -18,10 +18,12 @@
 #include <unistd.h>
 #include <sys/stat.h>
 
+#include "access/genam.h"
 #include "access/sysattr.h"
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/pg_authid.h"
+#include "catalog/pg_copy_handler.h"
 #include "commands/copy.h"
 #include "commands/defrem.h"
 #include "executor/executor.h"
@@ -36,6 +38,8 @@
 #include "rewrite/rewriteHandler.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
+#include "utils/fmgroids.h"
+#include "utils/formatting.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
@@ -427,6 +431,8 @@ ProcessCopyOptions(ParseState *pstate,
 
 	opts_out->file_encoding = -1;
 
+	/* Text is the default format. */
+	opts_out->handler = GetCopyRoutineByName(DEFAULT_COPY_HANDLER);
 	/* Extract options from the statement node tree */
 	foreach(option, options)
 	{
@@ -439,17 +445,11 @@ ProcessCopyOptions(ParseState *pstate,
 			if (format_specified)
 				errorConflictingDefElem(defel, pstate);
 			format_specified = true;
-			if (strcmp(fmt, "text") == 0)
-				 /* default format */ ;
-			else if (strcmp(fmt, "csv") == 0)
+			opts_out->handler = GetCopyRoutineByName(fmt);
+			if (strcmp(fmt, "csv") == 0)
 				opts_out->csv_mode = true;
 			else if (strcmp(fmt, "binary") == 0)
 				opts_out->binary = true;
-			else
-				ereport(ERROR,
-						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-						 errmsg("COPY format \"%s\" not recognized", fmt),
-						 parser_errposition(pstate, defel->location)));
 		}
 		else if (strcmp(defel->defname, "freeze") == 0)
 		{
@@ -864,3 +864,146 @@ CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
 
 	return attnums;
 }
+
+static const
+CopyRoutine CopyRoutineText = {
+	.type = T_CopyRoutine,
+	.to_start = CopyToFormatTextStart,
+	.to_one_row = CopyToFormatTextOneRow,
+	.to_end = CopyToFormatTextEnd,
+	.from_start = CopyFromFormatTextStart,
+	.from_next = CopyFromFormatTextNext,
+	.from_error_callback = CopyFromFormatTextErrorCallback,
+};
+
+/*
+ * We can use the same CopyRoutine for both of "text" and "csv" because
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their
+ * behavior. We can split the implementations and stop referring
+ * cstate->opts.csv_mode later.
+ */
+static const
+CopyRoutine CopyRoutineCSV = {
+	.type = T_CopyRoutine,
+	.to_start = CopyToFormatTextStart,
+	.to_one_row = CopyToFormatTextOneRow,
+	.to_end = CopyToFormatTextEnd,
+	.from_start = CopyFromFormatTextStart,
+	.from_next = CopyFromFormatTextNext,
+	.from_error_callback = CopyFromFormatTextErrorCallback,
+};
+
+static const
+CopyRoutine CopyRoutineBinary = {
+	.type = T_CopyRoutine,
+	.to_start = CopyToFormatBinaryStart,
+	.to_one_row = CopyToFormatBinaryOneRow,
+	.to_end = CopyToFormatBinaryEnd,
+	.from_start = CopyFromFormatBinaryStart,
+	.from_next = CopyFromFormatBinaryNext,
+	.from_error_callback = CopyFromFormatBinaryErrorCallback,
+};
+
+Datum
+text_copy_handler(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_POINTER(&CopyRoutineText);
+}
+
+Datum
+csv_copy_handler(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_POINTER(&CopyRoutineCSV);
+}
+
+Datum
+binary_copy_handler(PG_FUNCTION_ARGS)
+{
+	PG_RETURN_POINTER(&CopyRoutineBinary);
+}
+
+static NameData
+fmt_to_name(char *fmt)
+{
+	char		   *lcf; /* lower cased fmt */
+	size_t			len;
+	NameData		fmtname;
+
+	if (strlen(fmt) >= NAMEDATALEN)
+		elog(ERROR, "fmt name \"%s\" exceeds maximum name length "
+			 "of %d bytes", fmt, NAMEDATALEN - 1);
+
+	len = strlen(fmt);
+	lcf = asc_tolower(fmt, len);
+	len = strlen(lcf);
+	
+	memcpy(&(NameStr(fmtname)), lcf, len);
+	NameStr(fmtname)[len] = '\0';
+	pfree(lcf);
+
+	return fmtname;
+}
+
+CopyRoutine *
+GetCopyRoutine(Oid copyhandler)
+{
+	Datum			datum;
+	CopyRoutine	   *routine;
+
+	datum = OidFunctionCall0(copyhandler);
+	routine = (CopyRoutine *) DatumGetPointer(datum);
+
+	if (routine == NULL || !IsA(routine, CopyRoutine))
+		elog(ERROR, "copy handler function %u did not return an CopyRoutine struct",
+			 copyhandler);
+
+	return routine;
+}
+
+CopyRoutine *
+GetCopyRoutineByName(char *fmt)
+{
+	HeapTuple		tuple;
+	NameData		fmtname;
+	Relation		chrel;
+	ScanKeyData		scankey;
+	SysScanDesc		scan;
+	Form_pg_copy_handler chform;
+	regproc			copyhandler;
+
+	fmtname = fmt_to_name(fmt);
+
+	chrel = table_open(CopyHandlerRelationId, AccessShareLock);
+
+	ScanKeyInit(&scankey,
+				Anum_pg_copy_handler_chname,
+				BTEqualStrategyNumber, F_NAMEEQ,
+				NameGetDatum(&fmtname));
+
+	scan = systable_beginscan(chrel, CopyHandlerNameIndexId, true,
+							  NULL, 1, &scankey);
+	tuple = systable_getnext(scan);
+	if (!HeapTupleIsValid(tuple))
+		ereport(ERROR,
+				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+				 errmsg("COPY format \"%s\" not recognized", fmt)));
+
+	chform = (Form_pg_copy_handler)GETSTRUCT(tuple);
+
+	copyhandler = chform->copyhandler;
+
+	/* Complain if handler OID is invalid */
+	if (!RegProcedureIsValid(copyhandler))
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+				 errmsg("index access method \"%s\" does not have a handler",
+						NameStr(chform->chname))));
+	}
+
+	systable_endscan(scan);
+	table_close(chrel, AccessShareLock);
+
+	/* And finally, call the handler function to get the API struct. */
+	return GetCopyRoutine(copyhandler);
+}
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index f4861652a9..c2d92108c1 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -107,83 +107,88 @@ static char *limit_printout_length(const char *str);
 
 static void ClosePipeFromProgram(CopyFromState cstate);
 
-/*
- * error context callback for COPY FROM
- *
- * The argument for the error context must be CopyFromState.
- */
 void
-CopyFromErrorCallback(void *arg)
+CopyFromFormatBinaryErrorCallback(CopyFromState cstate)
 {
-	CopyFromState cstate = (CopyFromState) arg;
+	/* can't usefully display the data */
+	if (cstate->cur_attname)
+		errcontext("COPY %s, line %llu, column %s",
+				   cstate->cur_relname,
+				   (unsigned long long) cstate->cur_lineno,
+				   cstate->cur_attname);
+	else
+		errcontext("COPY %s, line %llu",
+				   cstate->cur_relname,
+				   (unsigned long long) cstate->cur_lineno);
+}
 
-	if (cstate->relname_only)
+void
+CopyFromFormatTextErrorCallback(CopyFromState cstate)
+{
+	if (cstate->cur_attname && cstate->cur_attval)
 	{
-		errcontext("COPY %s",
-				   cstate->cur_relname);
-		return;
+		/* error is relevant to a particular column */
+		char	   *attval;
+
+		attval = limit_printout_length(cstate->cur_attval);
+		errcontext("COPY %s, line %llu, column %s: \"%s\"",
+				   cstate->cur_relname,
+				   (unsigned long long) cstate->cur_lineno,
+				   cstate->cur_attname,
+				   attval);
+		pfree(attval);
 	}
-	if (cstate->opts.binary)
+	else if (cstate->cur_attname)
 	{
-		/* can't usefully display the data */
-		if (cstate->cur_attname)
-			errcontext("COPY %s, line %llu, column %s",
-					   cstate->cur_relname,
-					   (unsigned long long) cstate->cur_lineno,
-					   cstate->cur_attname);
-		else
-			errcontext("COPY %s, line %llu",
-					   cstate->cur_relname,
-					   (unsigned long long) cstate->cur_lineno);
+		/* error is relevant to a particular column, value is NULL */
+		errcontext("COPY %s, line %llu, column %s: null input",
+				   cstate->cur_relname,
+				   (unsigned long long) cstate->cur_lineno,
+				   cstate->cur_attname);
 	}
 	else
 	{
-		if (cstate->cur_attname && cstate->cur_attval)
+		/*
+		 * Error is relevant to a particular line.
+		 *
+		 * If line_buf still contains the correct line, print it.
+		 */
+		if (cstate->line_buf_valid)
 		{
-			/* error is relevant to a particular column */
-			char	   *attval;
+			char	   *lineval;
 
-			attval = limit_printout_length(cstate->cur_attval);
-			errcontext("COPY %s, line %llu, column %s: \"%s\"",
+			lineval = limit_printout_length(cstate->line_buf.data);
+			errcontext("COPY %s, line %llu: \"%s\"",
 					   cstate->cur_relname,
-					   (unsigned long long) cstate->cur_lineno,
-					   cstate->cur_attname,
-					   attval);
-			pfree(attval);
+					   (unsigned long long) cstate->cur_lineno, lineval);
+			pfree(lineval);
 		}
-		else if (cstate->cur_attname)
+		else
 		{
-			/* error is relevant to a particular column, value is NULL */
-			errcontext("COPY %s, line %llu, column %s: null input",
+			errcontext("COPY %s, line %llu",
 					   cstate->cur_relname,
-					   (unsigned long long) cstate->cur_lineno,
-					   cstate->cur_attname);
+					   (unsigned long long) cstate->cur_lineno);
 		}
-		else
-		{
-			/*
-			 * Error is relevant to a particular line.
-			 *
-			 * If line_buf still contains the correct line, print it.
-			 */
-			if (cstate->line_buf_valid)
-			{
-				char	   *lineval;
+	}
+}
 
-				lineval = limit_printout_length(cstate->line_buf.data);
-				errcontext("COPY %s, line %llu: \"%s\"",
-						   cstate->cur_relname,
-						   (unsigned long long) cstate->cur_lineno, lineval);
-				pfree(lineval);
-			}
-			else
-			{
-				errcontext("COPY %s, line %llu",
-						   cstate->cur_relname,
-						   (unsigned long long) cstate->cur_lineno);
-			}
-		}
+/*
+ * error context callback for COPY FROM
+ *
+ * The argument for the error context must be CopyFromState.
+ */
+void
+CopyFromErrorCallback(void *arg)
+{
+	CopyFromState cstate = (CopyFromState) arg;
+
+	if (cstate->relname_only)
+	{
+		errcontext("COPY %s",
+				   cstate->cur_relname);
+		return;
 	}
+	cstate->opts.handler->from_error_callback(cstate);
 }
 
 /*
@@ -1320,6 +1325,99 @@ CopyFrom(CopyFromState cstate)
 	return processed;
 }
 
+void
+CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+	FmgrInfo   *in_functions;
+	Oid		   *typioparams;
+	Oid			in_func_oid;
+	AttrNumber	num_phys_attrs;
+
+	/*
+	 * Pick up the required catalog information for each attribute in the
+	 * relation, including the input function, the element type (to pass to
+	 * the input function).
+	 */
+	num_phys_attrs = tupDesc->natts;
+	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+
+	for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+
+		/* We don't need info for dropped attributes */
+		if (att->attisdropped)
+			continue;
+
+		/* Fetch the input function and typioparam info */
+		getTypeBinaryInputInfo(att->atttypid,
+							   &in_func_oid, &typioparams[attnum - 1]);
+
+		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+	}
+	cstate->in_functions = in_functions;
+	cstate->typioparams = typioparams;
+}
+
+void
+CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc)
+{
+	FmgrInfo   *in_functions;
+	Oid		   *typioparams;
+	Oid			in_func_oid;
+	AttrNumber	attr_count,
+				num_phys_attrs;
+
+	num_phys_attrs = tupDesc->natts;
+
+	/*
+	 * If encoding conversion is needed, we need another buffer to hold
+	 * the converted input data.  Otherwise, we can just point input_buf
+	 * to the same buffer as raw_buf.
+	 */
+	if (cstate->need_transcoding)
+	{
+		cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
+		cstate->input_buf_index = cstate->input_buf_len = 0;
+	}
+	else
+		cstate->input_buf = cstate->raw_buf;
+	cstate->input_reached_eof = false;
+
+	initStringInfo(&cstate->line_buf);
+
+	/* create workspace for CopyReadAttributes results */
+	attr_count = list_length(cstate->attnumlist);
+
+	cstate->max_fields = attr_count;
+	cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
+
+	/*
+	 * Pick up the required catalog information for each attribute in the
+	 * relation, including the input function, the element type (to pass to
+	 * the input function).
+	 */
+	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+
+	for (int attnum = 1; attnum <= num_phys_attrs; attnum++)
+	{
+		Form_pg_attribute att = TupleDescAttr(tupDesc, attnum - 1);
+
+		/* We don't need info for dropped attributes */
+		if (att->attisdropped)
+			continue;
+
+		/* Fetch the input function and typioparam info */
+		getTypeInputInfo(att->atttypid,
+						 &in_func_oid, &typioparams[attnum - 1]);
+		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
+	}
+	cstate->in_functions = in_functions;
+	cstate->typioparams = typioparams;
+}
+
 /*
  * Setup to read tuples from a file for COPY FROM.
  *
@@ -1348,9 +1446,6 @@ BeginCopyFrom(ParseState *pstate,
 	TupleDesc	tupDesc;
 	AttrNumber	num_phys_attrs,
 				num_defaults;
-	FmgrInfo   *in_functions;
-	Oid		   *typioparams;
-	Oid			in_func_oid;
 	int		   *defmap;
 	ExprState **defexprs;
 	MemoryContext oldcontext;
@@ -1518,25 +1613,6 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->raw_buf_index = cstate->raw_buf_len = 0;
 	cstate->raw_reached_eof = false;
 
-	if (!cstate->opts.binary)
-	{
-		/*
-		 * If encoding conversion is needed, we need another buffer to hold
-		 * the converted input data.  Otherwise, we can just point input_buf
-		 * to the same buffer as raw_buf.
-		 */
-		if (cstate->need_transcoding)
-		{
-			cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1);
-			cstate->input_buf_index = cstate->input_buf_len = 0;
-		}
-		else
-			cstate->input_buf = cstate->raw_buf;
-		cstate->input_reached_eof = false;
-
-		initStringInfo(&cstate->line_buf);
-	}
-
 	initStringInfo(&cstate->attribute_buf);
 
 	/* Assign range table and rteperminfos, we'll need them in CopyFrom. */
@@ -1546,17 +1622,14 @@ BeginCopyFrom(ParseState *pstate,
 		cstate->rteperminfos = pstate->p_rteperminfos;
 	}
 
-	num_defaults = 0;
-	volatile_defexprs = false;
+	cstate->opts.handler->from_start(cstate, tupDesc);
 
 	/*
-	 * Pick up the required catalog information for each attribute in the
-	 * relation, including the input function, the element type (to pass to
-	 * the input function), and info about defaults and constraints. (Which
+	 * Pick up info about defaults and constraints. (Which
 	 * input function we use depends on text/binary format choice.)
 	 */
-	in_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-	typioparams = (Oid *) palloc(num_phys_attrs * sizeof(Oid));
+	num_defaults = 0;
+	volatile_defexprs = false;
 	defmap = (int *) palloc(num_phys_attrs * sizeof(int));
 	defexprs = (ExprState **) palloc(num_phys_attrs * sizeof(ExprState *));
 
@@ -1568,15 +1641,6 @@ BeginCopyFrom(ParseState *pstate,
 		if (att->attisdropped)
 			continue;
 
-		/* Fetch the input function and typioparam info */
-		if (cstate->opts.binary)
-			getTypeBinaryInputInfo(att->atttypid,
-								   &in_func_oid, &typioparams[attnum - 1]);
-		else
-			getTypeInputInfo(att->atttypid,
-							 &in_func_oid, &typioparams[attnum - 1]);
-		fmgr_info(in_func_oid, &in_functions[attnum - 1]);
-
 		/* Get default info if available */
 		defexprs[attnum - 1] = NULL;
 
@@ -1636,8 +1700,6 @@ BeginCopyFrom(ParseState *pstate,
 	cstate->bytes_processed = 0;
 
 	/* We keep those variables in cstate. */
-	cstate->in_functions = in_functions;
-	cstate->typioparams = typioparams;
 	cstate->defmap = defmap;
 	cstate->defexprs = defexprs;
 	cstate->volatile_defexprs = volatile_defexprs;
@@ -1716,15 +1778,6 @@ BeginCopyFrom(ParseState *pstate,
 		ReceiveCopyBinaryHeader(cstate);
 	}
 
-	/* create workspace for CopyReadAttributes results */
-	if (!cstate->opts.binary)
-	{
-		AttrNumber	attr_count = list_length(cstate->attnumlist);
-
-		cstate->max_fields = attr_count;
-		cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *));
-	}
-
 	MemoryContextSwitchTo(oldcontext);
 
 	return cstate;
diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index f553734582..bbe5bd1166 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -839,187 +839,208 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields)
 	return true;
 }
 
-/*
- * Read next tuple from file for COPY FROM. Return false if no more tuples.
- *
- * 'econtext' is used to evaluate default expression for each column that is
- * either not read from the file or is using the DEFAULT option of COPY FROM.
- * It can be NULL when no default values are used, i.e. when all columns are
- * read from the file, and DEFAULT option is unset.
- *
- * 'values' and 'nulls' arrays must be the same length as columns of the
- * relation passed to BeginCopyFrom. This function fills the arrays.
- */
 bool
-NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
-			 Datum *values, bool *nulls)
+CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext,
+						 Datum *values, bool *nulls)
 {
 	TupleDesc	tupDesc;
-	AttrNumber	num_phys_attrs,
-				attr_count,
-				num_defaults = cstate->num_defaults;
+	AttrNumber	attr_count;
+	int16		fld_count;
+	ListCell   *cur;
 	FmgrInfo   *in_functions = cstate->in_functions;
 	Oid		   *typioparams = cstate->typioparams;
-	int			i;
-	int		   *defmap = cstate->defmap;
-	ExprState **defexprs = cstate->defexprs;
 
-	tupDesc = RelationGetDescr(cstate->rel);
-	num_phys_attrs = tupDesc->natts;
 	attr_count = list_length(cstate->attnumlist);
 
-	/* Initialize all values for row to NULL */
-	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
-	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
-	MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
+	cstate->cur_lineno++;
 
-	if (!cstate->opts.binary)
+	if (!CopyGetInt16(cstate, &fld_count))
 	{
-		char	  **field_strings;
-		ListCell   *cur;
-		int			fldct;
-		int			fieldno;
-		char	   *string;
+		/* EOF detected (end of file, or protocol-level EOF) */
+		return false;
+	}
 
-		/* read raw fields in the next line */
-		if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
-			return false;
+	if (fld_count == -1)
+	{
+		/*
+		 * Received EOF marker.  Wait for the protocol-level EOF, and
+		 * complain if it doesn't come immediately.  In COPY FROM STDIN,
+		 * this ensures that we correctly handle CopyFail, if client
+		 * chooses to send that now.  When copying from file, we could
+		 * ignore the rest of the file like in text mode, but we choose to
+		 * be consistent with the COPY FROM STDIN case.
+		 */
+		char		dummy;
 
-		/* check for overflowing fields */
-		if (attr_count > 0 && fldct > attr_count)
+		if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
 			ereport(ERROR,
 					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("extra data after last expected column")));
+						errmsg("received copy data after EOF marker")));
+		return false;
+	}
 
-		fieldno = 0;
+	if (fld_count != attr_count)
+		ereport(ERROR,
+				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+					errmsg("row field count is %d, expected %d",
+						(int) fld_count, attr_count)));
 
-		/* Loop to read the user attributes on the line. */
-		foreach(cur, cstate->attnumlist)
-		{
-			int			attnum = lfirst_int(cur);
-			int			m = attnum - 1;
-			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+	tupDesc = RelationGetDescr(cstate->rel);
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		int			m = attnum - 1;
+		Form_pg_attribute att = TupleDescAttr(tupDesc, m);
+
+		cstate->cur_attname = NameStr(att->attname);
+		values[m] = CopyReadBinaryAttribute(cstate,
+											&in_functions[m],
+											typioparams[m],
+											att->atttypmod,
+											&nulls[m]);
+		cstate->cur_attname = NULL;
+	}
 
-			if (fieldno >= fldct)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("missing data for column \"%s\"",
-								NameStr(att->attname))));
-			string = field_strings[fieldno++];
+	return true;
+}
 
-			if (cstate->convert_select_flags &&
-				!cstate->convert_select_flags[m])
-			{
-				/* ignore input field, leaving column as NULL */
-				continue;
-			}
+bool
+CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext,
+					   Datum *values, bool *nulls)
+{
+	TupleDesc	tupDesc;
+	AttrNumber	attr_count;
+	FmgrInfo   *in_functions = cstate->in_functions;
+	Oid		   *typioparams = cstate->typioparams;
+	ExprState **defexprs = cstate->defexprs;
+	char	  **field_strings;
+	ListCell   *cur;
+	int			fldct;
+	int			fieldno = 0;
+	char	   *string;
 
-			if (cstate->opts.csv_mode)
-			{
-				if (string == NULL &&
-					cstate->opts.force_notnull_flags[m])
-				{
-					/*
-					 * FORCE_NOT_NULL option is set and column is NULL -
-					 * convert it to the NULL string.
-					 */
-					string = cstate->opts.null_print;
-				}
-				else if (string != NULL && cstate->opts.force_null_flags[m]
-						 && strcmp(string, cstate->opts.null_print) == 0)
-				{
-					/*
-					 * FORCE_NULL option is set and column matches the NULL
-					 * string. It must have been quoted, or otherwise the
-					 * string would already have been set to NULL. Convert it
-					 * to NULL as specified.
-					 */
-					string = NULL;
-				}
-			}
+	attr_count = list_length(cstate->attnumlist);
+
+	/* read raw fields in the next line */
+	if (!NextCopyFromRawFields(cstate, &field_strings, &fldct))
+		return false;
+
+	/* check for overflowing fields */
+	if (attr_count > 0 && fldct > attr_count)
+		ereport(ERROR,
+				(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+					errmsg("extra data after last expected column")));
 
-			cstate->cur_attname = NameStr(att->attname);
-			cstate->cur_attval = string;
+	tupDesc = RelationGetDescr(cstate->rel);
+	/* Loop to read the user attributes on the line. */
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		int			m = attnum - 1;
+		Form_pg_attribute att = TupleDescAttr(tupDesc, m);
 
-			if (string != NULL)
-				nulls[m] = false;
+		if (fieldno >= fldct)
+			ereport(ERROR,
+					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
+						errmsg("missing data for column \"%s\"",
+							NameStr(att->attname))));
+		string = field_strings[fieldno++];
 
-			if (cstate->defaults[m])
+		if (cstate->convert_select_flags &&
+			!cstate->convert_select_flags[m])
+		{
+			/* ignore input field, leaving column as NULL */
+			continue;
+		}
+
+		if (cstate->opts.csv_mode)
+		{
+			if (string == NULL &&
+				cstate->opts.force_notnull_flags[m])
 			{
 				/*
-				 * The caller must supply econtext and have switched into the
-				 * per-tuple memory context in it.
+				 * FORCE_NOT_NULL option is set and column is NULL -
+				 * convert it to the NULL string.
 				 */
-				Assert(econtext != NULL);
-				Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
-
-				values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
+				string = cstate->opts.null_print;
+			}
+			else if (string != NULL && cstate->opts.force_null_flags[m]
+						&& strcmp(string, cstate->opts.null_print) == 0)
+			{
+				/*
+				 * FORCE_NULL option is set and column matches the NULL
+				 * string. It must have been quoted, or otherwise the
+				 * string would already have been set to NULL. Convert it
+				 * to NULL as specified.
+				 */
+				string = NULL;
 			}
-			else
-				values[m] = InputFunctionCall(&in_functions[m],
-											  string,
-											  typioparams[m],
-											  att->atttypmod);
-
-			cstate->cur_attname = NULL;
-			cstate->cur_attval = NULL;
 		}
 
-		Assert(fieldno == attr_count);
-	}
-	else
-	{
-		/* binary */
-		int16		fld_count;
-		ListCell   *cur;
+		cstate->cur_attname = NameStr(att->attname);
+		cstate->cur_attval = string;
 
-		cstate->cur_lineno++;
+		if (string != NULL)
+			nulls[m] = false;
 
-		if (!CopyGetInt16(cstate, &fld_count))
-		{
-			/* EOF detected (end of file, or protocol-level EOF) */
-			return false;
-		}
-
-		if (fld_count == -1)
+		if (cstate->defaults[m])
 		{
 			/*
-			 * Received EOF marker.  Wait for the protocol-level EOF, and
-			 * complain if it doesn't come immediately.  In COPY FROM STDIN,
-			 * this ensures that we correctly handle CopyFail, if client
-			 * chooses to send that now.  When copying from file, we could
-			 * ignore the rest of the file like in text mode, but we choose to
-			 * be consistent with the COPY FROM STDIN case.
+			 * The caller must supply econtext and have switched into the
+			 * per-tuple memory context in it.
 			 */
-			char		dummy;
+			Assert(econtext != NULL);
+			Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory);
 
-			if (CopyReadBinaryData(cstate, &dummy, 1) > 0)
-				ereport(ERROR,
-						(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-						 errmsg("received copy data after EOF marker")));
-			return false;
+			values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]);
 		}
+		else
+			values[m] = InputFunctionCall(&in_functions[m],
+											string,
+											typioparams[m],
+											att->atttypmod);
 
-		if (fld_count != attr_count)
-			ereport(ERROR,
-					(errcode(ERRCODE_BAD_COPY_FILE_FORMAT),
-					 errmsg("row field count is %d, expected %d",
-							(int) fld_count, attr_count)));
+		cstate->cur_attname = NULL;
+		cstate->cur_attval = NULL;
+	}
 
-		foreach(cur, cstate->attnumlist)
-		{
-			int			attnum = lfirst_int(cur);
-			int			m = attnum - 1;
-			Form_pg_attribute att = TupleDescAttr(tupDesc, m);
-
-			cstate->cur_attname = NameStr(att->attname);
-			values[m] = CopyReadBinaryAttribute(cstate,
-												&in_functions[m],
-												typioparams[m],
-												att->atttypmod,
-												&nulls[m]);
-			cstate->cur_attname = NULL;
-		}
+	Assert(fieldno == attr_count);
+	return true;
+}
+
+/*
+ * Read next tuple from file for COPY FROM. Return false if no more tuples.
+ *
+ * 'econtext' is used to evaluate default expression for each column that is
+ * either not read from the file or is using the DEFAULT option of COPY FROM.
+ * It can be NULL when no default values are used, i.e. when all columns are
+ * read from the file, and DEFAULT option is unset.
+ *
+ * 'values' and 'nulls' arrays must be the same length as columns of the
+ * relation passed to BeginCopyFrom. This function fills the arrays.
+ */
+bool
+NextCopyFrom(CopyFromState cstate, ExprContext *econtext,
+			 Datum *values, bool *nulls)
+{
+	TupleDesc	tupDesc;
+	AttrNumber	num_phys_attrs,
+				num_defaults = cstate->num_defaults;
+	int			i;
+	int		   *defmap = cstate->defmap;
+	ExprState **defexprs = cstate->defexprs;
+
+	tupDesc = RelationGetDescr(cstate->rel);
+	num_phys_attrs = tupDesc->natts;
+
+	/* Initialize all values for row to NULL */
+	MemSet(values, 0, num_phys_attrs * sizeof(Datum));
+	MemSet(nulls, true, num_phys_attrs * sizeof(bool));
+	MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool));
+
+	if (!cstate->opts.handler->from_next(cstate, econtext, values, nulls))
+	{
+		return false;
 	}
 
 	/*
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index c66a047c4a..f906a6cf7f 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -131,6 +131,205 @@ static void CopySendEndOfRow(CopyToState cstate);
 static void CopySendInt32(CopyToState cstate, int32 val);
 static void CopySendInt16(CopyToState cstate, int16 val);
 
+/*
+ * CopyHandlerOps implementation of COPY TO for "text" and "csv".
+ * CopyToFormatText*() refer cstate->opts.csv_mode and change their behavior.
+ * We can split this implementation and stop referring cstate->opts.csv_mode
+ * later.
+ */
+
+static void
+CopyToFormatTextSendEndOfRow(CopyToState cstate)
+{
+	switch (cstate->copy_dest)
+	{
+	case COPY_FILE:
+		/* Default line termination depends on platform */
+#ifndef WIN32
+		CopySendChar(cstate, '\n');
+#else
+		CopySendString(cstate, "\r\n");
+#endif
+		break;
+	case COPY_FRONTEND:
+		/* The FE/BE protocol uses \n as newline for all platforms */
+		CopySendChar(cstate, '\n');
+		break;
+	default:
+		break;
+	}
+	CopySendEndOfRow(cstate);
+}
+
+void
+CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc)
+{
+	int			num_phys_attrs;
+	ListCell   *cur;
+
+	num_phys_attrs = tupDesc->natts;
+	/* Get info about the columns we need to process. */
+	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		Oid			out_func_oid;
+		bool		isvarlena;
+		Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+		getTypeOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+		fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+	}
+
+	/*
+	 * For non-binary copy, we need to convert null_print to file
+	 * encoding, because it will be sent directly with CopySendString.
+	 */
+	if (cstate->need_transcoding)
+		cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
+														  cstate->opts.null_print_len,
+														  cstate->file_encoding);
+
+	/* if a header has been requested send the line */
+	if (cstate->opts.header_line)
+	{
+		bool		hdr_delim = false;
+
+		foreach(cur, cstate->attnumlist)
+		{
+			int			attnum = lfirst_int(cur);
+			char	   *colname;
+
+			if (hdr_delim)
+				CopySendChar(cstate, cstate->opts.delim[0]);
+			hdr_delim = true;
+
+			colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
+
+			if (cstate->opts.csv_mode)
+				CopyAttributeOutCSV(cstate, colname, false,
+									list_length(cstate->attnumlist) == 1);
+			else
+				CopyAttributeOutText(cstate, colname);
+		}
+
+		CopyToFormatTextSendEndOfRow(cstate);
+	}
+}
+
+void
+CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+	bool		need_delim = false;
+	FmgrInfo   *out_functions = cstate->out_functions;
+	ListCell   *cur;
+
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		Datum		value = slot->tts_values[attnum - 1];
+		bool		isnull = slot->tts_isnull[attnum - 1];
+
+		if (need_delim)
+			CopySendChar(cstate, cstate->opts.delim[0]);
+		need_delim = true;
+
+		if (isnull)
+			CopySendString(cstate, cstate->opts.null_print_client);
+		else
+		{
+			char	   *string;
+
+			string = OutputFunctionCall(&out_functions[attnum - 1], value);
+			if (cstate->opts.csv_mode)
+				CopyAttributeOutCSV(cstate, string,
+									cstate->opts.force_quote_flags[attnum - 1],
+									list_length(cstate->attnumlist) == 1);
+			else
+				CopyAttributeOutText(cstate, string);
+		}
+	}
+
+	CopyToFormatTextSendEndOfRow(cstate);
+}
+
+void
+CopyToFormatTextEnd(CopyToState cstate)
+{
+}
+
+/*
+ * CopyHandlerOps implementation for "binary" COPY TO.
+ */
+
+void
+CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc)
+{
+	int			num_phys_attrs;
+	ListCell   *cur;
+
+	num_phys_attrs = tupDesc->natts;
+	/* Get info about the columns we need to process. */
+	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		Oid			out_func_oid;
+		bool		isvarlena;
+		Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
+
+		getTypeBinaryOutputInfo(attr->atttypid, &out_func_oid, &isvarlena);
+		fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
+	}
+
+	/* Generate header for a binary copy */
+	/* Signature */
+	CopySendData(cstate, BinarySignature, 11);
+	/* Flags field */
+	CopySendInt32(cstate, 0);
+	/* No header extension */
+	CopySendInt32(cstate, 0);
+}
+
+void
+CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot)
+{
+	FmgrInfo   *out_functions = cstate->out_functions;
+	ListCell   *cur;
+
+	/* Binary per-tuple header */
+	CopySendInt16(cstate, list_length(cstate->attnumlist));
+
+	foreach(cur, cstate->attnumlist)
+	{
+		int			attnum = lfirst_int(cur);
+		Datum		value = slot->tts_values[attnum - 1];
+		bool		isnull = slot->tts_isnull[attnum - 1];
+
+		if (isnull)
+			CopySendInt32(cstate, -1);
+		else
+		{
+			bytea	   *outputbytes;
+
+			outputbytes = SendFunctionCall(&out_functions[attnum - 1], value);
+			CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
+			CopySendData(cstate, VARDATA(outputbytes),
+						 VARSIZE(outputbytes) - VARHDRSZ);
+		}
+	}
+
+	CopySendEndOfRow(cstate);
+}
+
+void
+CopyToFormatBinaryEnd(CopyToState cstate)
+{
+	/* Generate trailer for a binary copy */
+	CopySendInt16(cstate, -1);
+	/* Need to flush out the trailer */
+	CopySendEndOfRow(cstate);
+}
 
 /*
  * Send copy start/stop messages for frontend copies.  These have changed
@@ -198,16 +397,6 @@ CopySendEndOfRow(CopyToState cstate)
 	switch (cstate->copy_dest)
 	{
 		case COPY_FILE:
-			if (!cstate->opts.binary)
-			{
-				/* Default line termination depends on platform */
-#ifndef WIN32
-				CopySendChar(cstate, '\n');
-#else
-				CopySendString(cstate, "\r\n");
-#endif
-			}
-
 			if (fwrite(fe_msgbuf->data, fe_msgbuf->len, 1,
 					   cstate->copy_file) != 1 ||
 				ferror(cstate->copy_file))
@@ -242,10 +431,6 @@ CopySendEndOfRow(CopyToState cstate)
 			}
 			break;
 		case COPY_FRONTEND:
-			/* The FE/BE protocol uses \n as newline for all platforms */
-			if (!cstate->opts.binary)
-				CopySendChar(cstate, '\n');
-
 			/* Dump the accumulated row as one CopyData message */
 			(void) pq_putmessage(PqMsg_CopyData, fe_msgbuf->data, fe_msgbuf->len);
 			break;
@@ -748,8 +933,6 @@ DoCopyTo(CopyToState cstate)
 	bool		pipe = (cstate->filename == NULL && cstate->data_dest_cb == NULL);
 	bool		fe_copy = (pipe && whereToSendOutput == DestRemote);
 	TupleDesc	tupDesc;
-	int			num_phys_attrs;
-	ListCell   *cur;
 	uint64		processed;
 
 	if (fe_copy)
@@ -759,32 +942,11 @@ DoCopyTo(CopyToState cstate)
 		tupDesc = RelationGetDescr(cstate->rel);
 	else
 		tupDesc = cstate->queryDesc->tupDesc;
-	num_phys_attrs = tupDesc->natts;
 	cstate->opts.null_print_client = cstate->opts.null_print;	/* default */
 
 	/* We use fe_msgbuf as a per-row buffer regardless of copy_dest */
 	cstate->fe_msgbuf = makeStringInfo();
 
-	/* Get info about the columns we need to process. */
-	cstate->out_functions = (FmgrInfo *) palloc(num_phys_attrs * sizeof(FmgrInfo));
-	foreach(cur, cstate->attnumlist)
-	{
-		int			attnum = lfirst_int(cur);
-		Oid			out_func_oid;
-		bool		isvarlena;
-		Form_pg_attribute attr = TupleDescAttr(tupDesc, attnum - 1);
-
-		if (cstate->opts.binary)
-			getTypeBinaryOutputInfo(attr->atttypid,
-									&out_func_oid,
-									&isvarlena);
-		else
-			getTypeOutputInfo(attr->atttypid,
-							  &out_func_oid,
-							  &isvarlena);
-		fmgr_info(out_func_oid, &cstate->out_functions[attnum - 1]);
-	}
-
 	/*
 	 * Create a temporary memory context that we can reset once per row to
 	 * recover palloc'd memory.  This avoids any problems with leaks inside
@@ -795,57 +957,7 @@ DoCopyTo(CopyToState cstate)
 											   "COPY TO",
 											   ALLOCSET_DEFAULT_SIZES);
 
-	if (cstate->opts.binary)
-	{
-		/* Generate header for a binary copy */
-		int32		tmp;
-
-		/* Signature */
-		CopySendData(cstate, BinarySignature, 11);
-		/* Flags field */
-		tmp = 0;
-		CopySendInt32(cstate, tmp);
-		/* No header extension */
-		tmp = 0;
-		CopySendInt32(cstate, tmp);
-	}
-	else
-	{
-		/*
-		 * For non-binary copy, we need to convert null_print to file
-		 * encoding, because it will be sent directly with CopySendString.
-		 */
-		if (cstate->need_transcoding)
-			cstate->opts.null_print_client = pg_server_to_any(cstate->opts.null_print,
-															  cstate->opts.null_print_len,
-															  cstate->file_encoding);
-
-		/* if a header has been requested send the line */
-		if (cstate->opts.header_line)
-		{
-			bool		hdr_delim = false;
-
-			foreach(cur, cstate->attnumlist)
-			{
-				int			attnum = lfirst_int(cur);
-				char	   *colname;
-
-				if (hdr_delim)
-					CopySendChar(cstate, cstate->opts.delim[0]);
-				hdr_delim = true;
-
-				colname = NameStr(TupleDescAttr(tupDesc, attnum - 1)->attname);
-
-				if (cstate->opts.csv_mode)
-					CopyAttributeOutCSV(cstate, colname, false,
-										list_length(cstate->attnumlist) == 1);
-				else
-					CopyAttributeOutText(cstate, colname);
-			}
-
-			CopySendEndOfRow(cstate);
-		}
-	}
+	cstate->opts.handler->to_start(cstate, tupDesc);
 
 	if (cstate->rel)
 	{
@@ -884,13 +996,7 @@ DoCopyTo(CopyToState cstate)
 		processed = ((DR_copy *) cstate->queryDesc->dest)->processed;
 	}
 
-	if (cstate->opts.binary)
-	{
-		/* Generate trailer for a binary copy */
-		CopySendInt16(cstate, -1);
-		/* Need to flush out the trailer */
-		CopySendEndOfRow(cstate);
-	}
+	cstate->opts.handler->to_end(cstate);
 
 	MemoryContextDelete(cstate->rowcontext);
 
@@ -906,71 +1012,15 @@ DoCopyTo(CopyToState cstate)
 static void
 CopyOneRowTo(CopyToState cstate, TupleTableSlot *slot)
 {
-	bool		need_delim = false;
-	FmgrInfo   *out_functions = cstate->out_functions;
 	MemoryContext oldcontext;
-	ListCell   *cur;
-	char	   *string;
 
 	MemoryContextReset(cstate->rowcontext);
 	oldcontext = MemoryContextSwitchTo(cstate->rowcontext);
 
-	if (cstate->opts.binary)
-	{
-		/* Binary per-tuple header */
-		CopySendInt16(cstate, list_length(cstate->attnumlist));
-	}
-
 	/* Make sure the tuple is fully deconstructed */
 	slot_getallattrs(slot);
 
-	foreach(cur, cstate->attnumlist)
-	{
-		int			attnum = lfirst_int(cur);
-		Datum		value = slot->tts_values[attnum - 1];
-		bool		isnull = slot->tts_isnull[attnum - 1];
-
-		if (!cstate->opts.binary)
-		{
-			if (need_delim)
-				CopySendChar(cstate, cstate->opts.delim[0]);
-			need_delim = true;
-		}
-
-		if (isnull)
-		{
-			if (!cstate->opts.binary)
-				CopySendString(cstate, cstate->opts.null_print_client);
-			else
-				CopySendInt32(cstate, -1);
-		}
-		else
-		{
-			if (!cstate->opts.binary)
-			{
-				string = OutputFunctionCall(&out_functions[attnum - 1],
-											value);
-				if (cstate->opts.csv_mode)
-					CopyAttributeOutCSV(cstate, string,
-										cstate->opts.force_quote_flags[attnum - 1],
-										list_length(cstate->attnumlist) == 1);
-				else
-					CopyAttributeOutText(cstate, string);
-			}
-			else
-			{
-				bytea	   *outputbytes;
-
-				outputbytes = SendFunctionCall(&out_functions[attnum - 1],
-											   value);
-				CopySendInt32(cstate, VARSIZE(outputbytes) - VARHDRSZ);
-				CopySendData(cstate, VARDATA(outputbytes),
-							 VARSIZE(outputbytes) - VARHDRSZ);
-			}
-		}
-	}
-
-	CopySendEndOfRow(cstate);
+	cstate->opts.handler->to_one_row(cstate, slot);
 
 	MemoryContextSwitchTo(oldcontext);
 }
diff --git a/src/backend/nodes/Makefile b/src/backend/nodes/Makefile
index ebbe9052cb..e64e121c01 100644
--- a/src/backend/nodes/Makefile
+++ b/src/backend/nodes/Makefile
@@ -50,6 +50,7 @@ node_headers = \
 	access/sdir.h \
 	access/tableam.h \
 	access/tsmapi.h \
+	commands/copy.h \
 	commands/event_trigger.h \
 	commands/trigger.h \
 	executor/tuptable.h \
diff --git a/src/backend/nodes/gen_node_support.pl b/src/backend/nodes/gen_node_support.pl
index 72c7963578..237ac42742 100644
--- a/src/backend/nodes/gen_node_support.pl
+++ b/src/backend/nodes/gen_node_support.pl
@@ -61,6 +61,7 @@ my @all_input_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copy.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
@@ -85,6 +86,7 @@ my @nodetag_only_files = qw(
   access/sdir.h
   access/tableam.h
   access/tsmapi.h
+  commands/copy.h
   commands/event_trigger.h
   commands/trigger.h
   executor/tuptable.h
diff --git a/src/backend/utils/adt/pseudotypes.c b/src/backend/utils/adt/pseudotypes.c
index 3ba8cb192c..8f12927a4d 100644
--- a/src/backend/utils/adt/pseudotypes.c
+++ b/src/backend/utils/adt/pseudotypes.c
@@ -378,3 +378,4 @@ PSEUDOTYPE_DUMMY_IO_FUNCS(anyelement);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anynonarray);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anycompatible);
 PSEUDOTYPE_DUMMY_IO_FUNCS(anycompatiblenonarray);
+PSEUDOTYPE_DUMMY_IO_FUNCS(copy_handler);
diff --git a/src/include/catalog/meson.build b/src/include/catalog/meson.build
index dcb3c5f766..d9aa3f14ba 100644
--- a/src/include/catalog/meson.build
+++ b/src/include/catalog/meson.build
@@ -69,6 +69,7 @@ catalog_headers = [
   'pg_publication_rel.h',
   'pg_subscription.h',
   'pg_subscription_rel.h',
+  'pg_copy_handler.h',
 ]
 
 # The .dat files we need can just be listed alphabetically.
@@ -97,6 +98,7 @@ bki_data = [
   'pg_ts_parser.dat',
   'pg_ts_template.dat',
   'pg_type.dat',
+  'pg_copy_handler.dat',
 ]
 bki_data_f = files(bki_data)
 
diff --git a/src/include/catalog/pg_copy_handler.dat b/src/include/catalog/pg_copy_handler.dat
new file mode 100644
index 0000000000..052329cb53
--- /dev/null
+++ b/src/include/catalog/pg_copy_handler.dat
@@ -0,0 +1,25 @@
+#----------------------------------------------------------------------
+#
+# pg_copy_handler.dat
+#    Initial contents of the pg_copy_handler system catalog.
+#
+# Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+# Portions Copyright (c) 1994, Regents of the University of California
+#
+# src/include/catalog/pg_copy_handler.dat
+#
+#----------------------------------------------------------------------
+
+[
+
+{ oid => '4554', oid_symbol => 'TEXT_COPY_HANDLER_OID',
+  descr => 'text copy handler',
+  chname => 'text', copyhandler => 'text_copy_handler'},
+{ oid => '4555', oid_symbol => 'CSV_COPY_HANDLER_OID',
+  descr => 'csv copy handler',
+  chname => 'csv', copyhandler => 'csv_copy_handler'},
+{ oid => '4556', oid_symbol => 'BINARY_COPY_HANDLER_OID',
+  descr => 'binary copy handler',
+  chname => 'binary', copyhandler => 'binary_copy_handler'},
+
+]
diff --git a/src/include/catalog/pg_copy_handler.h b/src/include/catalog/pg_copy_handler.h
new file mode 100644
index 0000000000..74ad06f4d6
--- /dev/null
+++ b/src/include/catalog/pg_copy_handler.h
@@ -0,0 +1,50 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_copy_handler.h
+ *	  definition of the "copy handler" system catalog (pg_copy_handler)
+ *
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/catalog/pg_copy_handler.h
+ *
+ * NOTES
+ *	  The Catalog.pm module reads this file and derives schema
+ *	  information.
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_COPY_HANDLER_H
+#define PG_COPY_HANDLER_H
+
+#include "catalog/genbki.h"
+#include "catalog/pg_copy_handler_d.h"
+
+/* ----------------
+ *		pg_copy_handler definition.  cpp turns this into
+ *		typedef struct FormData_pg_copy_handler
+ * ----------------
+ */
+CATALOG(pg_copy_handler,4551,CopyHandlerRelationId)
+{
+	Oid			oid;			/* oid */
+
+	/* copy handler name */
+	NameData	chname;
+
+	/* handler function */
+	regproc		copyhandler BKI_LOOKUP(pg_proc);
+} FormData_pg_copy_handler;
+
+/* ----------------
+ *		Form_pg_copy_handler corresponds to a pointer to a tuple with
+ *		the format of pg_copy_handler relation.
+ * ----------------
+ */
+typedef FormData_pg_copy_handler *Form_pg_copy_handler;
+
+DECLARE_UNIQUE_INDEX(pg_copy_handler_name_index, 4552, CopyHandlerNameIndexId, pg_copy_handler, btree(chname name_ops));
+DECLARE_UNIQUE_INDEX_PKEY(pg_copy_handler_oid_index, 4553, CopyHandlerOidIndexId, pg_copy_handler, btree(oid oid_ops));
+
+#endif							/* PG_COPY_HANDLER_H */
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index fb58dee3bc..8b9b60c90f 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -948,6 +948,20 @@
   prorettype => 'void', proargtypes => 'regclass int8',
   prosrc => 'brin_desummarize_range' },
 
+# Copy handlers
+{ oid => '4557', descr => 'text copy handler',
+  proname => 'text_copy_handler', provolatile => 'v',
+  prorettype => 'copy_handler', proargtypes => 'internal',
+  prosrc => 'text_copy_handler' },
+{ oid => '4558', descr => 'csv copy handler',
+  proname => 'csv_copy_handler', provolatile => 'v',
+  prorettype => 'copy_handler', proargtypes => 'internal',
+  prosrc => 'csv_copy_handler' },
+{ oid => '4559', descr => 'binary copy handler',
+  proname => 'binary_copy_handler', provolatile => 'v',
+  prorettype => 'copy_handler', proargtypes => 'internal',
+  prosrc => 'binary_copy_handler' },
+
 { oid => '338', descr => 'validate an operator class',
   proname => 'amvalidate', provolatile => 'v', prorettype => 'bool',
   proargtypes => 'oid', prosrc => 'amvalidate' },
@@ -7609,6 +7623,13 @@
 { oid => '268', descr => 'I/O',
   proname => 'table_am_handler_out', prorettype => 'cstring',
   proargtypes => 'table_am_handler', prosrc => 'table_am_handler_out' },
+{ oid => '388', descr => 'I/O',
+  proname => 'copy_handler_in', proisstrict => 'f',
+  prorettype => 'copy_handler', proargtypes => 'cstring',
+  prosrc => 'copy_handler_in' },
+{ oid => '389', descr => 'I/O',
+  proname => 'copy_handler_out', prorettype => 'cstring',
+  proargtypes => 'copy_handler', prosrc => 'copy_handler_out' },
 { oid => '5086', descr => 'I/O',
   proname => 'anycompatible_in', prorettype => 'anycompatible',
   proargtypes => 'cstring', prosrc => 'anycompatible_in' },
diff --git a/src/include/catalog/pg_type.dat b/src/include/catalog/pg_type.dat
index f6110a850d..aa6d731cb5 100644
--- a/src/include/catalog/pg_type.dat
+++ b/src/include/catalog/pg_type.dat
@@ -639,6 +639,13 @@
   typcategory => 'P', typinput => 'table_am_handler_in',
   typoutput => 'table_am_handler_out', typreceive => '-', typsend => '-',
   typalign => 'i' },
+{ oid => '3814',
+  typname => 'copy_handler',
+  descr => 'pseudo-type for the result of a copy handler',
+  typlen => '4', typbyval => 't', typtype => 'p',
+  typcategory => 'P', typinput => 'copy_handler_in',
+  typoutput => 'copy_handler_out', typreceive => '-', typsend => '-',
+  typalign => 'i' },
 { oid => '3831',
   descr => 'pseudo-type representing a range over a polymorphic base type',
   typname => 'anyrange', typlen => '-1', typbyval => 'f', typtype => 'p',
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index f2cca0b90b..c2af6f2aff 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -30,6 +30,33 @@ typedef enum CopyHeaderChoice
 	COPY_HEADER_MATCH,
 } CopyHeaderChoice;
 
+#define DEFAULT_COPY_HANDLER		"text"
+
+/* These are private in commands/copy[from|to].c */
+typedef struct CopyFromStateData *CopyFromState;
+typedef struct CopyToStateData *CopyToState;
+
+/* Routines for a COPY HANDLER implementation. */
+typedef struct CopyRoutine
+{
+	/* this must be set to T_CopyRoutine */
+	NodeTag		type;
+
+	/* Called when COPY TO is started. This will send a header. */
+	void		(*to_start) (CopyToState cstate, TupleDesc tupDesc);
+
+	/* Copy one row for COPY TO. */
+	void		(*to_one_row) (CopyToState cstate, TupleTableSlot *slot);
+
+	/* Called when COPY TO is ended. This will send a trailer. */
+	void		(*to_end) (CopyToState cstate);
+
+	void		(*from_start) (CopyFromState cstate, TupleDesc tupDesc);
+	bool		(*from_next) (CopyFromState cstate, ExprContext *econtext,
+							  Datum *values, bool *nulls);
+	void		(*from_error_callback) (CopyFromState cstate);
+} CopyRoutine;
+
 /*
  * A struct to hold COPY options, in a parsed form. All of these are related
  * to formatting, except for 'freeze', which doesn't really belong here, but
@@ -63,12 +90,9 @@ typedef struct CopyFormatOptions
 	bool	   *force_null_flags;	/* per-column CSV FN flags */
 	bool		convert_selectively;	/* do selective binary conversion? */
 	List	   *convert_select; /* list of column names (can be NIL) */
+	CopyRoutine	   *handler;		/* copy handler routines */
 } CopyFormatOptions;
 
-/* These are private in commands/copy[from|to].c */
-typedef struct CopyFromStateData *CopyFromState;
-typedef struct CopyToStateData *CopyToState;
-
 typedef int (*copy_data_source_cb) (void *outbuf, int minread, int maxread);
 typedef void (*copy_data_dest_cb) (void *data, int len);
 
@@ -91,6 +115,9 @@ extern uint64 CopyFrom(CopyFromState cstate);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern CopyRoutine *GetCopyRoutine(Oid copyhandler);
+extern CopyRoutine *GetCopyRoutineByName(char *fmt);
+
 /*
  * internal prototypes
  */
@@ -102,4 +129,20 @@ extern uint64 DoCopyTo(CopyToState cstate);
 extern List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
 							List *attnamelist);
 
+extern void CopyToFormatTextStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatTextOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatTextEnd(CopyToState cstate);
+extern void CopyFromFormatTextStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatTextNext(CopyFromState cstate, ExprContext *econtext,
+								   Datum *values, bool *nulls);
+extern void CopyFromFormatTextErrorCallback(CopyFromState cstate);
+
+extern void CopyToFormatBinaryStart(CopyToState cstate, TupleDesc tupDesc);
+extern void CopyToFormatBinaryOneRow(CopyToState cstate, TupleTableSlot *slot);
+extern void CopyToFormatBinaryEnd(CopyToState cstate);
+extern void CopyFromFormatBinaryStart(CopyFromState cstate, TupleDesc tupDesc);
+extern bool CopyFromFormatBinaryNext(CopyFromState cstate, ExprContext *econtext,
+									 Datum *values, bool *nulls);
+extern void CopyFromFormatBinaryErrorCallback(CopyFromState cstate);
+
 #endif							/* COPY_H */
diff --git a/src/test/regress/expected/oidjoins.out b/src/test/regress/expected/oidjoins.out
index 215eb899be..a1ab73dc37 100644
--- a/src/test/regress/expected/oidjoins.out
+++ b/src/test/regress/expected/oidjoins.out
@@ -266,3 +266,4 @@ NOTICE:  checking pg_subscription {subdbid} => pg_database {oid}
 NOTICE:  checking pg_subscription {subowner} => pg_authid {oid}
 NOTICE:  checking pg_subscription_rel {srsubid} => pg_subscription {oid}
 NOTICE:  checking pg_subscription_rel {srrelid} => pg_class {oid}
+NOTICE:  checking pg_copy_handler {copyhandler} => pg_proc {oid}
-- 
2.41.0

