 contrib/file_fdw/file_fdw.c |  130 ++++++++++++++++++++++++++++++++++++++++++-
 src/backend/commands/copy.c |   76 ++++++++++++++++++++-----
 2 files changed, 189 insertions(+), 17 deletions(-)

diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index e3b9223..fd9412f 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -16,6 +16,7 @@
 #include <unistd.h>
 
 #include "access/reloptions.h"
+#include "access/sysattr.h"
 #include "catalog/pg_foreign_table.h"
 #include "commands/copy.h"
 #include "commands/defrem.h"
@@ -29,6 +30,7 @@
 #include "optimizer/pathnode.h"
 #include "optimizer/planmain.h"
 #include "optimizer/restrictinfo.h"
+#include "optimizer/var.h"
 #include "utils/memutils.h"
 #include "utils/rel.h"
 
@@ -136,6 +138,9 @@ static bool is_valid_option(const char *option, Oid context);
 static void fileGetOptions(Oid foreigntableid,
 			   char **filename, List **other_options);
 static List *get_file_fdw_attribute_options(Oid relid);
+static bool check_selective_binary_conversion(RelOptInfo *baserel,
+											  Oid foreigntableid,
+											  List **columns);
 static void estimate_size(PlannerInfo *root, RelOptInfo *baserel,
 			  FileFdwPlanState *fdw_private);
 static void estimate_costs(PlannerInfo *root, RelOptInfo *baserel,
@@ -457,6 +462,16 @@ fileGetForeignPaths(PlannerInfo *root,
 	FileFdwPlanState *fdw_private = (FileFdwPlanState *) baserel->fdw_private;
 	Cost		startup_cost;
 	Cost		total_cost;
+	bool		result;
+	List	   *columns;
+	List	   *coption = NIL;
+
+	/* Decide whether to selectively perform binary conversion */
+	result = check_selective_binary_conversion(baserel,
+											   foreigntableid,
+											   &columns);
+	if (result)
+		coption = list_make1(makeDefElem("convert_binary", (Node *) columns));
 
 	/* Estimate costs */
 	estimate_costs(root, baserel, fdw_private,
@@ -470,7 +485,7 @@ fileGetForeignPaths(PlannerInfo *root,
 									 total_cost,
 									 NIL,		/* no pathkeys */
 									 NULL,		/* no outer rel either */
-									 NIL));		/* no fdw_private data */
+									 coption));
 
 	/*
 	 * If data file was sorted, and we knew it somehow, we could insert
@@ -507,7 +522,7 @@ fileGetForeignPlan(PlannerInfo *root,
 							scan_clauses,
 							scan_relid,
 							NIL,	/* no expressions to evaluate */
-							NIL);		/* no private state either */
+							best_path->fdw_private);
 }
 
 /*
@@ -544,6 +559,7 @@ fileExplainForeignScan(ForeignScanState *node, ExplainState *es)
 static void
 fileBeginForeignScan(ForeignScanState *node, int eflags)
 {
+	ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
 	char	   *filename;
 	List	   *options;
 	CopyState	cstate;
@@ -559,6 +575,10 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
 	fileGetOptions(RelationGetRelid(node->ss.ss_currentRelation),
 				   &filename, &options);
 
+	/* Add an option for selective binary conversion */
+	if(plan->fdw_private != NIL)
+		options = list_concat(options, plan->fdw_private);
+
 	/*
 	 * Create CopyState from FDW options.  We always acquire all columns, so
 	 * as to match the expected ScanTupleSlot signature.
@@ -695,6 +715,112 @@ fileAnalyzeForeignTable(Relation relation,
 }
 
 /*
+ * check_selective_binary_conversion
+ */
+static bool
+check_selective_binary_conversion(RelOptInfo *baserel,
+								  Oid foreigntableid,
+								  List **columns)
+{
+	ForeignTable *table;
+	ListCell   *lc;
+	Relation	rel;
+	TupleDesc	tupleDesc;
+	AttrNumber	attnum;
+	Bitmapset  *attrs_used = NULL;
+	Bitmapset  *tmpset;
+	int			cnt;
+	int			i;
+
+	*columns = NIL;
+
+	/*
+	 * Examine format of the file.  If binary format, we don't need to convert
+	 * at all.
+	 */
+	table = GetForeignTable(foreigntableid);
+	foreach(lc, table->options)
+	{
+		DefElem    *def = (DefElem *) lfirst(lc);
+
+		if (strcmp(def->defname, "format") == 0)
+		{
+			char	   *format = defGetString(def);
+
+			if (strcmp(format, "binary") == 0)
+				return false;
+			break;
+		}
+	}
+
+	/* Add all the attributes needed for joins or final output. */
+	pull_varattnos((Node *) baserel->reltargetlist, baserel->relid, &attrs_used);
+
+	/* Add all the attributes used by restriction clauses. */
+	foreach(lc, baserel->baserestrictinfo)
+	{
+		RestrictInfo   *rinfo = (RestrictInfo *) lfirst(lc);
+
+		pull_varattnos((Node *) rinfo->clause, baserel->relid, &attrs_used);
+	}
+
+	rel = heap_open(foreigntableid, AccessShareLock);
+	tupleDesc = RelationGetDescr(rel);
+
+	tmpset = bms_copy(attrs_used);
+	while ((attnum = bms_first_member(tmpset)) >= 0)
+	{
+		/* Adjust for system attributes. */
+		attnum += FirstLowInvalidHeapAttributeNumber;
+
+		/* If whole-row reference, give up. */
+		if (attnum == 0)
+		{
+			*columns = NIL;
+			return false;
+		}
+
+		/* Ignore system attributes. */
+		if (attnum < 0)
+			continue;
+
+		/* Get user attributes. */
+		if (attnum > 0)
+		{
+			Form_pg_attribute attr = tupleDesc->attrs[attnum - 1];
+			char	   *attname = pstrdup(NameStr(attr->attname));
+
+			/* Skip dropped attributes. */
+			if (attr->attisdropped)
+				continue;
+			*columns = lappend(*columns, makeString(attname));
+		}
+	}
+	bms_free(tmpset);
+
+	heap_close(rel, AccessShareLock);
+
+	/* If all the user attributes needed, give up. */
+	cnt = 0;
+	for (i = 0; i < tupleDesc->natts; i++)
+	{
+		Form_pg_attribute attr = tupleDesc->attrs[i];
+
+		/* Skip dropped attributes. */
+		if (attr->attisdropped)
+			continue;
+		cnt++;
+	}
+	if (cnt == list_length(*columns))
+	{
+		*columns = NIL;
+		return false;
+	}
+
+	return true;
+}
+
+/*
  * Estimate size of a foreign table.
  *
  * The main result is returned in baserel->rows.  We also set
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 98bcb2f..c74c7df 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -122,6 +122,11 @@ typedef struct CopyStateData
 	List	   *force_notnull;	/* list of column names */
 	bool	   *force_notnull_flags;	/* per-column CSV FNN flags */
 
+	/* parameters from contrib/file_fdw */
+	List	   *convert_binary;	/* list of column names */
+	bool	   *convert_binary_flags;	/* per-column CSV/TEXT CB flags */
+	bool		convert_selectively;	/* selective binary conversion? */
+
 	/* these are just for error messages, see CopyFromErrorCallback */
 	const char *cur_relname;	/* table name for error messages */
 	int			cur_lineno;		/* line number for error messages */
@@ -961,6 +966,21 @@ ProcessCopyOptions(CopyState cstate,
 						 errmsg("argument to option \"%s\" must be a list of column names",
 								defel->defname)));
 		}
+		else if (strcmp(defel->defname, "convert_binary") == 0)
+		{
+			if (cstate->convert_selectively)
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("conflicting or redundant options")));
+			cstate->convert_selectively = true;
+			if (defel->arg == NULL || IsA(defel->arg, List))
+				cstate->convert_binary = (List *) defel->arg;
+			else
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+						 errmsg("argument to option \"%s\" must be a list of column names",
+								defel->defname)));
+		}
 		else if (strcmp(defel->defname, "encoding") == 0)
 		{
 			if (cstate->file_encoding >= 0)
@@ -1307,6 +1327,28 @@ BeginCopy(bool is_from,
 		}
 	}
 
+	/* Convert convert binary name list to per-column flags, check validity */
+	cstate->convert_binary_flags = (bool *) palloc0(num_phys_attrs * sizeof(bool));
+	if (cstate->convert_binary)
+	{
+		List	   *attnums;
+		ListCell   *cur;
+
+		attnums = CopyGetAttnums(tupDesc, cstate->rel, cstate->convert_binary);
+
+		foreach(cur, attnums)
+		{
+			int			attnum = lfirst_int(cur);
+
+			if (!list_member_int(cstate->attnumlist, attnum))
+				ereport(ERROR,
+						(errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
+				errmsg("selected column \"%s\" not referenced by COPY",
+					   NameStr(tupDesc->attrs[attnum - 1]->attname))));
+			cstate->convert_binary_flags[attnum - 1] = true;
+		}
+	}
+
 	/* Use client encoding when ENCODING option is not specified. */
 	if (cstate->file_encoding < 0)
 		cstate->file_encoding = pg_get_client_encoding();
@@ -2565,23 +2607,27 @@ NextCopyFrom(CopyState cstate, ExprContext *econtext,
 								NameStr(attr[m]->attname))));
 			string = field_strings[fieldno++];
 
-			if (cstate->csv_mode && string == NULL &&
-				cstate->force_notnull_flags[m])
+			if (!cstate->convert_selectively ||
+				cstate->convert_binary_flags[m])
 			{
-				/* Go ahead and read the NULL string */
-				string = cstate->null_print;
-			}
+				if (cstate->csv_mode && string == NULL &&
+					cstate->force_notnull_flags[m])
+				{
+					/* Go ahead and read the NULL string */
+					string = cstate->null_print;
+				}
 
-			cstate->cur_attname = NameStr(attr[m]->attname);
-			cstate->cur_attval = string;
-			values[m] = InputFunctionCall(&in_functions[m],
-										  string,
-										  typioparams[m],
-										  attr[m]->atttypmod);
-			if (string != NULL)
-				nulls[m] = false;
-			cstate->cur_attname = NULL;
-			cstate->cur_attval = NULL;
+				cstate->cur_attname = NameStr(attr[m]->attname);
+				cstate->cur_attval = string;
+				values[m] = InputFunctionCall(&in_functions[m],
+											  string,
+											  typioparams[m],
+											  attr[m]->atttypmod);
+				if (string != NULL)
+					nulls[m] = false;
+				cstate->cur_attname = NULL;
+				cstate->cur_attval = NULL;
+			}
 		}
 
 		Assert(fieldno == nfields);
