From 9d8aa2c1e9f12e451d586de0a2d0d8989f638789 Mon Sep 17 00:00:00 2001
From: Thomas Munro <thomas.munro@gmail.com>
Date: Thu, 2 May 2019 19:36:44 +1200
Subject: [PATCH] Make file_fdw parallel-aware.

Teach file_fdw how to divide its scan up for parallel workers, much like
parallel seq scan.  The complication is that lines might span chunks.
Deal with that by making the owner of the previous chunk responsible for
the first line in the following chunk.

Author: Thomas Munro
Discussion: https://postgr.es/m/CA%2Bz6ocRFEnThhXye3F9_ZjSMDSfOGdnGOt8hnXsVgGmFpNTYFA%40mail.gmail.com
---
 contrib/file_fdw/file_fdw.c           | 86 ++++++++++++++++++++++-
 src/backend/commands/copy.c           | 99 ++++++++++++++++++++++++++-
 src/backend/optimizer/path/costsize.c |  3 +-
 src/include/commands/copy.h           |  4 ++
 src/include/optimizer/cost.h          |  1 +
 5 files changed, 189 insertions(+), 4 deletions(-)

diff --git a/contrib/file_fdw/file_fdw.c b/contrib/file_fdw/file_fdw.c
index 85534a3a76..f093d6e538 100644
--- a/contrib/file_fdw/file_fdw.c
+++ b/contrib/file_fdw/file_fdw.c
@@ -29,8 +29,10 @@
 #include "foreign/foreign.h"
 #include "miscadmin.h"
 #include "nodes/makefuncs.h"
+#include "optimizer/cost.h"
 #include "optimizer/optimizer.h"
 #include "optimizer/pathnode.h"
+#include "optimizer/paths.h"
 #include "optimizer/planmain.h"
 #include "optimizer/restrictinfo.h"
 #include "utils/memutils.h"
@@ -39,6 +41,9 @@
 
 PG_MODULE_MAGIC;
 
+/* The size of the chunks used for parallel scans, in bytes. */
+#define PARALLEL_STEP_SIZE 8192
+
 /*
  * Describes the valid options for objects that use this wrapper.
  */
@@ -105,6 +110,8 @@ typedef struct FileFdwExecutionState
 	List	   *options;		/* merged COPY options, excluding filename and
 								 * is_program */
 	CopyState	cstate;			/* COPY execution state */
+
+	pg_atomic_uint64 *next_parallel_step_offset;
 } FileFdwExecutionState;
 
 /*
@@ -139,6 +146,17 @@ static bool fileAnalyzeForeignTable(Relation relation,
 						BlockNumber *totalpages);
 static bool fileIsForeignScanParallelSafe(PlannerInfo *root, RelOptInfo *rel,
 							  RangeTblEntry *rte);
+static Size fileEstimateDSMForeignScan(ForeignScanState *node,
+									   ParallelContext *pcxt);
+static void fileInitializeDSMForeignScan(ForeignScanState *node,
+										 ParallelContext *pcxt,
+										 void *coordinate);
+static void fileReInitializeDSMForeignScan(ForeignScanState *node,
+										   ParallelContext *pcxt,
+										   void *coordinate);
+static void fileInitializeWorkerForeignScan(ForeignScanState *node,
+											shm_toc *toc,
+											void *coordinate);
 
 /*
  * Helper functions
@@ -181,6 +199,10 @@ file_fdw_handler(PG_FUNCTION_ARGS)
 	fdwroutine->EndForeignScan = fileEndForeignScan;
 	fdwroutine->AnalyzeForeignTable = fileAnalyzeForeignTable;
 	fdwroutine->IsForeignScanParallelSafe = fileIsForeignScanParallelSafe;
+	fdwroutine->EstimateDSMForeignScan = fileEstimateDSMForeignScan;
+	fdwroutine->InitializeDSMForeignScan = fileInitializeDSMForeignScan;
+	fdwroutine->ReInitializeDSMForeignScan = fileReInitializeDSMForeignScan;
+	fdwroutine->InitializeWorkerForeignScan = fileInitializeWorkerForeignScan;
 
 	PG_RETURN_POINTER(fdwroutine);
 }
@@ -540,6 +562,9 @@ fileGetForeignPaths(PlannerInfo *root,
 	Cost		total_cost;
 	List	   *columns;
 	List	   *coptions = NIL;
+	ForeignPath *partial_path;
+	double		parallel_divisor;
+	int			parallel_workers;
 
 	/* Decide whether to selectively perform binary conversion */
 	if (check_selective_binary_conversion(baserel,
@@ -577,6 +602,29 @@ fileGetForeignPaths(PlannerInfo *root,
 	 * appropriate pathkeys into the ForeignPath node to tell the planner
 	 * that.
 	 */
+
+	/* Should we add a partial path to enable a parallel scan? */
+	if (baserel->consider_parallel && !fdw_private->is_program)
+	{
+		partial_path = create_foreignscan_path(root, baserel, NULL,
+											   baserel->rows,
+											   startup_cost,
+											   total_cost,
+											   NIL, NULL, NULL, coptions);
+
+		parallel_workers = compute_parallel_worker(baserel,
+												   fdw_private->pages, -1,
+												   max_parallel_workers_per_gather);
+		partial_path->path.parallel_workers = parallel_workers;
+		partial_path->path.parallel_aware = true;
+		partial_path->path.parallel_safe = true;
+		parallel_divisor = get_parallel_divisor(&partial_path->path);
+		partial_path->path.rows /= parallel_divisor;
+		partial_path->path.total_cost = startup_cost +
+			((total_cost - startup_cost) / parallel_divisor);
+		if (parallel_workers > 0)
+			add_partial_path(baserel, (Path *) partial_path);
+	}
 }
 
 /*
@@ -689,7 +737,7 @@ fileBeginForeignScan(ForeignScanState *node, int eflags)
 	 * Save state in node->fdw_state.  We must save enough information to call
 	 * BeginCopyFrom() again.
 	 */
-	festate = (FileFdwExecutionState *) palloc(sizeof(FileFdwExecutionState));
+	festate = (FileFdwExecutionState *) palloc0(sizeof(FileFdwExecutionState));
 	festate->filename = filename;
 	festate->is_program = is_program;
 	festate->options = options;
@@ -1223,3 +1271,39 @@ file_acquire_sample_rows(Relation onerel, int elevel,
 
 	return numrows;
 }
+
+static Size
+fileEstimateDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt)
+{
+	return sizeof(pg_atomic_uint64);
+}
+
+static void
+fileInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
+							 void *coordinate)
+{
+	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
+
+	EnableParallelCopy(festate->cstate, PARALLEL_STEP_SIZE,
+					   (pg_atomic_uint64 *) coordinate);
+}
+
+static void
+fileReInitializeDSMForeignScan(ForeignScanState *node, ParallelContext *pcxt,
+							   void *coordinate)
+{
+	pg_atomic_uint64 *next_parallel_step_offset =
+		(pg_atomic_uint64 *) coordinate;
+
+	pg_atomic_write_u64(next_parallel_step_offset, 0);
+}
+
+static void
+fileInitializeWorkerForeignScan(ForeignScanState *node, shm_toc *toc,
+								void *coordinate)
+{
+	FileFdwExecutionState *festate = (FileFdwExecutionState *) node->fdw_state;
+
+	EnableParallelCopy(festate->cstate, PARALLEL_STEP_SIZE,
+					   (pg_atomic_uint64 *) coordinate);
+}
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index c39218f8db..44d5613504 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -188,6 +188,13 @@ typedef struct CopyStateData
 
 	TransitionCaptureState *transition_capture;
 
+	/* For use by file_fdw's parallel scans. */
+	uint64		current_offset;
+	uint64		start_offset;	/* start of current chunk */
+	uint64		last_offset;	/* one past the current chunk */
+	uint64		parallel_step_size;
+	pg_atomic_uint64 *next_parallel_step_offset;
+
 	/*
 	 * These variables are used to reduce overhead in textual COPY FROM.
 	 *
@@ -305,6 +312,7 @@ if (1) \
 { \
 	if (raw_buf_ptr + (extralen) >= copy_buf_len && !hit_eof) \
 	{ \
+		CONSUME_BYTES(prev_raw_ptr - raw_buf_ptr); \
 		raw_buf_ptr = prev_raw_ptr; /* undo fetch */ \
 		need_data = true; \
 		continue; \
@@ -318,7 +326,10 @@ if (1) \
 	if (raw_buf_ptr + (extralen) >= copy_buf_len && hit_eof) \
 	{ \
 		if (extralen) \
+		{ \
+			CONSUME_BYTES(copy_buf_len - raw_buf_ptr); \
 			raw_buf_ptr = copy_buf_len; /* consume the partial character */ \
+		} \
 		/* backslash just before EOF, treat as data char */ \
 		result = true; \
 		break; \
@@ -341,10 +352,15 @@ if (1) \
 	} \
 } else ((void) 0)
 
+/* Update the offset of the byte within the file. */
+#define CONSUME_BYTES(n) \
+	cstate->current_offset += (n)
+
 /* Undo any read-ahead and jump out of the block. */
 #define NO_END_OF_COPY_GOTO \
 if (1) \
 { \
+	CONSUME_BYTES((prev_raw_ptr + 1) - raw_buf_ptr); \
 	raw_buf_ptr = prev_raw_ptr + 1; \
 	goto not_end_of_copy; \
 } else ((void) 0)
@@ -379,6 +395,7 @@ static void CopyAttributeOutCSV(CopyState cstate, char *string,
 static List *CopyGetAttnums(TupleDesc tupDesc, Relation rel,
 			   List *attnamelist);
 static char *limit_printout_length(const char *str);
+static void ParallelCopyStep(CopyState cstate);
 
 /* Low-level communications functions */
 static void SendCopyBegin(CopyState cstate);
@@ -3374,6 +3391,68 @@ CopyFrom(CopyState cstate)
 	return processed;
 }
 
+/*
+ * Enable parallel scan while copying from a file.  Currently used only by
+ * file_fdw.
+ */
+void
+EnableParallelCopy(CopyState cstate,
+				   uint64 parallel_step_size,
+				   pg_atomic_uint64 *next_parallel_step_offset)
+{
+	cstate->start_offset = 0;
+	cstate->last_offset = 0;
+	cstate->current_offset = 0;
+	cstate->parallel_step_size = parallel_step_size;
+	cstate->next_parallel_step_offset = next_parallel_step_offset;
+}
+
+/*
+ * Find a new chunk of data to read.
+ */
+void
+ParallelCopyStep(CopyState cstate)
+{
+	for (;;)
+	{
+		/* Take the next available chunk using the shared cursor. */
+		cstate->start_offset = cstate->current_offset =
+			pg_atomic_fetch_add_u64(cstate->next_parallel_step_offset,
+									cstate->parallel_step_size);
+		cstate->last_offset = cstate->start_offset +
+			cstate->parallel_step_size;
+		cstate->raw_buf_index = cstate->raw_buf_len = 0;
+
+		/* Seek to that position in the file. */
+		if (fseek(cstate->copy_file, cstate->start_offset, SEEK_SET) < 0)
+			elog(ERROR, "cannot seek within file: %m");
+
+		/*
+		 * Except for the very first page, we need to discard the first line we
+		 * find.  That's because it could be a partial line.  Whichever process
+		 * is working on the previous chunk will handle it.
+		 */
+		if (cstate->start_offset > 0)
+		{
+			cstate->cur_lineno++;
+			CopyReadLine(cstate);
+
+			/*
+			 * If the tail of the discarded line was bigger than one chunk,
+			 * then we need to find a new chunk.  In other words, if
+			 * several processes find themselves at different places in the
+			 * middle of the same very long line, only the one that started
+			 * on the same chunk as the *end* of the line gets to process the
+			 * rest of that chunk.  Everyone else has to go around and find a
+			 * new chunk.
+			 */
+			if (cstate->current_offset > cstate->last_offset)
+				continue;
+		}
+		break;
+	}
+}
+
 /*
  * Setup to read tuples from a file for COPY FROM.
  *
@@ -3642,8 +3721,19 @@ NextCopyFromRawFields(CopyState cstate, char ***fields, int *nfields)
 	/* only available for text or csv input */
 	Assert(!cstate->binary);
 
+	/*
+	 * If parallel copy, make sure we have a chunk.  Note that we are
+	 * responsible for reading a line that begins exactly on the first byte of
+	 * the next chunk, too, because the owner of the next chunk will skip it
+	 * (it can't tell if that's a whole line or a partial line).
+	 */
+	if (cstate->next_parallel_step_offset &&
+		(cstate->current_offset > cstate->last_offset || cstate->last_offset == 0))
+		ParallelCopyStep(cstate);
+
 	/* on input just throw the header line away */
-	if (cstate->cur_lineno == 0 && cstate->header_line)
+	if (cstate->start_offset == 0 &&
+		cstate->cur_lineno == 0 && cstate->header_line)
 	{
 		cstate->cur_lineno++;
 		if (CopyReadLine(cstate))
@@ -4075,6 +4165,7 @@ CopyReadLineText(CopyState cstate)
 		/* OK to fetch a character */
 		prev_raw_ptr = raw_buf_ptr;
 		c = copy_raw_buf[raw_buf_ptr++];
+		CONSUME_BYTES(1);
 
 		if (cstate->csv_mode)
 		{
@@ -4138,6 +4229,7 @@ CopyReadLineText(CopyState cstate)
 				if (c == '\n')
 				{
 					raw_buf_ptr++;	/* eat newline */
+					CONSUME_BYTES(1);
 					cstate->eol_type = EOL_CRNL;	/* in case not set yet */
 				}
 				else
@@ -4212,6 +4304,7 @@ CopyReadLineText(CopyState cstate)
 			if (c2 == '.')
 			{
 				raw_buf_ptr++;	/* consume the '.' */
+				CONSUME_BYTES(1);
 
 				/*
 				 * Note: if we loop back for more data here, it does not
@@ -4224,6 +4317,7 @@ CopyReadLineText(CopyState cstate)
 					IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
 					/* if hit_eof, c2 will become '\0' */
 					c2 = copy_raw_buf[raw_buf_ptr++];
+					CONSUME_BYTES(1);
 
 					if (c2 == '\n')
 					{
@@ -4249,6 +4343,7 @@ CopyReadLineText(CopyState cstate)
 				IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(0);
 				/* if hit_eof, c2 will become '\0' */
 				c2 = copy_raw_buf[raw_buf_ptr++];
+				CONSUME_BYTES(1);
 
 				if (c2 != '\r' && c2 != '\n')
 				{
@@ -4295,6 +4390,7 @@ CopyReadLineText(CopyState cstate)
 				 * so we don't increment in those cases.
 				 */
 				raw_buf_ptr++;
+				CONSUME_BYTES(1);
 		}
 
 		/*
@@ -4327,6 +4423,7 @@ not_end_of_copy:
 			IF_NEED_REFILL_AND_NOT_EOF_CONTINUE(mblen - 1);
 			IF_NEED_REFILL_AND_EOF_BREAK(mblen - 1);
 			raw_buf_ptr += mblen - 1;
+			CONSUME_BYTES(mblen - 1);
 		}
 		first_char_in_line = false;
 	}							/* end of outer loop */
diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c
index afd32884a2..1c7421c9ed 100644
--- a/src/backend/optimizer/path/costsize.c
+++ b/src/backend/optimizer/path/costsize.c
@@ -176,7 +176,6 @@ static Cost append_nonpartial_cost(List *subpaths, int numpaths,
 static void set_rel_width(PlannerInfo *root, RelOptInfo *rel);
 static double relation_byte_size(double tuples, int width);
 static double page_size(double tuples, int width);
-static double get_parallel_divisor(Path *path);
 
 
 /*
@@ -5478,7 +5477,7 @@ page_size(double tuples, int width)
  * Estimate the fraction of the work that each worker will do given the
  * number of workers budgeted for the path.
  */
-static double
+double
 get_parallel_divisor(Path *path)
 {
 	double		parallel_divisor = path->parallel_workers;
diff --git a/src/include/commands/copy.h b/src/include/commands/copy.h
index c70556137c..63259db2a9 100644
--- a/src/include/commands/copy.h
+++ b/src/include/commands/copy.h
@@ -41,4 +41,8 @@ extern uint64 CopyFrom(CopyState cstate);
 
 extern DestReceiver *CreateCopyDestReceiver(void);
 
+extern void EnableParallelCopy(CopyState cstate,
+							   uint64 parallel_step_size,
+							   pg_atomic_uint64 *next_parallel_step_offset);
+
 #endif							/* COPY_H */
diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h
index ac6de0f6be..2a81c3cae8 100644
--- a/src/include/optimizer/cost.h
+++ b/src/include/optimizer/cost.h
@@ -198,5 +198,6 @@ extern void set_foreign_size_estimates(PlannerInfo *root, RelOptInfo *rel);
 extern PathTarget *set_pathtarget_cost_width(PlannerInfo *root, PathTarget *target);
 extern double compute_bitmap_pages(PlannerInfo *root, RelOptInfo *baserel,
 					 Path *bitmapqual, int loop_count, Cost *cost, double *tuple);
+extern double get_parallel_divisor(Path *path);
 
 #endif							/* COST_H */
-- 
2.21.0

