On Tue, Nov 3, 2020 at 2:28 PM Amit Kapila <amit.kapil...@gmail.com> wrote:
>
> On Mon, Nov 2, 2020 at 12:40 PM Heikki Linnakangas <hlinn...@iki.fi> wrote:
> >
> > On 02/11/2020 08:14, Amit Kapila wrote:
> > > On Fri, Oct 30, 2020 at 10:11 PM Heikki Linnakangas <hlinn...@iki.fi> 
> > > wrote:
> > >>
> > >> In this design, you don't need to keep line boundaries in shared memory,
> > >> because each worker process is responsible for finding the line
> > >> boundaries of its own block.
> > >>
> > >> There's a point of serialization here, in that the next block cannot be
> > >> processed, until the worker working on the previous block has finished
> > >> scanning the EOLs, and set the starting position on the next block,
> > >> putting it in READY state. That's not very different from your patch,
> > >> where you had a similar point of serialization because the leader
> > >> scanned the EOLs,
> > >
> > > But in the design (single producer multiple consumer) used by the
> > > patch the worker doesn't need to wait till the complete block is
> > > processed, it can start processing the lines already found. This will
> > > also allow workers to start much earlier to process the data as it
> > > doesn't need to wait for all the offsets corresponding to 64K block
> > > ready. However, in the design where each worker is processing the 64K
> > > block, it can lead to much longer waits. I think this will impact the
> > > Copy STDIN case more where in most cases (200-300 bytes tuples) we
> > > receive line-by-line from client and find the line-endings by leader.
> > > If the leader doesn't find the line-endings the workers need to wait
> > > till the leader fill the entire 64K chunk, OTOH, with current approach
> > > the worker can start as soon as leader is able to populate some
> > > minimum number of line-endings
> >
> > You can use a smaller block size.
> >
>
> Sure, but the same problem can happen if the last line in that block
> is too long and we need to peek into the next block. And then there
> could be cases where a single line could be greater than 64K.
>
> > However, the point of parallel copy is
> > to maximize bandwidth.
> >
>
> Okay, but this first-phase (finding the line boundaries) can anyway be
> not done in parallel and we have seen in some of the initial
> benchmarking that this initial phase is a small part of work
> especially when the table has indexes, constraints, etc. So, I think
> it won't matter much if this splitting is done in a single process or
> multiple processes.
>

I wrote a patch to compare the performance of the current
implementation leader identifying the line bound design vs the workers
identifying the line boundary. The results of the same is given below:
The below data can be read as parallel copy time taken in seconds
based on the leader identifying the line boundary design, parallel
copy time taken in seconds based on the workers identifying the line
boundary design, workers.

Use case 1 - 10million rows, 5.2GB data,3 indexes on integer columns:
(211.206, 632.583, 1), (165.402, 360.152, 2), (137.608, 219.623, 4),
(128.003, 206.851, 8), (114.518, 177.790, 16), (109.257, 170.058, 20),
(102.050, 158.376, 30)

Use case 2 - 10million rows, 5.2GB data,2 indexes on integer columns,
1 index on text column, csv file:
(1212.356, 1602.118, 1), (707.191, 849.105, 2), (369.620, 441.068, 4),
(221.359, 252.775, 8), (167.152, 180.207, 16), (168.804, 181.986, 20),
(172.320, 194.875, 30)

Use case 3 - 10million rows, 5.2GB data without index:
(96.317, 437.453, 1), (70.730, 240.517, 2), (64.436, 197.604, 4),
(67.186, 175.630, 8), (76.561, 156.015, 16), (81.025, 150.687, 20),
(86.578, 148.481, 30)

Use case 4 - 10000 records, 9.6GB, toast data:
(147.076, 276.323, 1), (101.610, 141.893, 2), (100.703, 134.096, 4),
(112.583, 134.765, 8), (101.898, 135.789, 16), (109.258, 135.625, 20),
(109.219, 136.144, 30)

Attached is a patch that was used for the same. The patch is written
on top of the parallel copy patch.
The design Amit, Andres & myself voted for that is the leader
identifying the line bound design and sharing it in shared memory is
performing better.

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
From dd9b6be19573b5391d01373b53e64a5c1dc305fd Mon Sep 17 00:00:00 2001
From: Vignesh C <vignes...@gmail.com>
Date: Mon, 28 Dec 2020 15:00:48 +0530
Subject: [PATCH v12 7/7] Parallel copy based on workers identifying line
 boundary.

Parallel copy based on workers identifying line boundary.
---
 src/backend/commands/copyfromparse.c     |  93 +++----
 src/backend/commands/copyparallel.c      | 441 +++++++++++++++++--------------
 src/include/commands/copyfrom_internal.h |  38 ++-
 src/test/regress/expected/copy2.out      |   2 +-
 4 files changed, 318 insertions(+), 256 deletions(-)

diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c
index a767bae..1d79da9 100644
--- a/src/backend/commands/copyfromparse.c
+++ b/src/backend/commands/copyfromparse.c
@@ -82,13 +82,15 @@ if (1) \
 { \
 	if (raw_buf_ptr > cstate->raw_buf_index) \
 	{ \
-		if (!IsParallelCopy()) \
-			appendBinaryStringInfo(&cstate->line_buf, \
-								   cstate->raw_buf + cstate->raw_buf_index, \
-								   raw_buf_ptr - cstate->raw_buf_index); \
-		else \
-			line_size +=  raw_buf_ptr - cstate->raw_buf_index; \
-		\
+		appendBinaryStringInfo(&cstate->line_buf, \
+							   cstate->raw_buf + cstate->raw_buf_index, \
+							   raw_buf_ptr - cstate->raw_buf_index); \
+		if (IsParallelCopy()) \
+		{ \
+			ParallelCopyDataBlock *curr_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[cstate->pcdata->first_blk_info.current_block]; \
+			curr_data_blk_ptr->skip_bytes += raw_buf_ptr - cstate->raw_buf_index; \
+			cstate->pcdata->first_blk_info.start_offset += raw_buf_ptr - cstate->raw_buf_index; \
+		} \
 		cstate->raw_buf_index = raw_buf_ptr; \
 	} \
 } else ((void) 0)
@@ -120,7 +122,6 @@ int			CopyGetData(CopyFromState cstate, void *databuf,
 						int minread, int maxread);
 static inline bool CopyGetInt32(CopyFromState cstate, int32 *val);
 static inline bool CopyGetInt16(CopyFromState cstate, int16 *val);
-static bool CopyLoadRawBuf(CopyFromState cstate);
 int			CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes);
 static void ClearEOLFromCopiedData(CopyFromState cstate, char *copy_line_data,
 								   int copy_line_pos, int *copy_line_size);
@@ -389,7 +390,7 @@ CopyGetInt16(CopyFromState cstate, int16 *val)
  * of the buffer and then we load more data after that.  This case occurs only
  * when a multibyte character crosses a bufferload boundary.
  */
-static bool
+bool
 CopyLoadRawBuf(CopyFromState cstate)
 {
 	int			nbytes = (!IsParallelCopy()) ? RAW_BUF_BYTES(cstate) : cstate->raw_buf_len;
@@ -725,7 +726,7 @@ CopyReadLine(CopyFromState cstate)
 		 * after \. up to the protocol end of copy data.  (XXX maybe better
 		 * not to treat \. as special?)
 		 */
-		if (cstate->copy_src == COPY_NEW_FE)
+		if (cstate->copy_src == COPY_NEW_FE && !IsParallelCopy())
 		{
 			bool		bIsFirst = true;
 
@@ -809,7 +810,7 @@ void
 ConvertToServerEncoding(CopyFromState cstate)
 {
 	/* Done reading the line.  Convert it to server encoding. */
-	if (cstate->need_transcoding && (!IsParallelCopy() || IsWorker()))
+	if (cstate->need_transcoding)
 	{
 		char	   *cvt;
 
@@ -850,10 +851,6 @@ CopyReadLineText(CopyFromState cstate)
 	char		quotec = '\0';
 	char		escapec = '\0';
 
-	/* For parallel copy */
-	int			line_size = 0;
-	uint32		line_pos = 0;
-
 	cstate->eol_type = EOL_UNKNOWN;
 
 	if (cstate->opts.csv_mode)
@@ -910,18 +907,23 @@ CopyReadLineText(CopyFromState cstate)
 		if (raw_buf_ptr >= copy_buf_len || need_data)
 		{
 			REFILL_LINEBUF;
-			if ((copy_buf_len == DATA_BLOCK_SIZE || copy_buf_len == 0) &&
-				IsParallelCopy())
-				SetRawBufForLoad(cstate, line_size, copy_buf_len, raw_buf_ptr,
-								 &copy_raw_buf);
 
-			/*
-			 * Try to read some more data.  This will certainly reset
-			 * raw_buf_index to zero, and raw_buf_ptr must go with it.
-			 */
-			if (!CopyLoadRawBuf(cstate))
-				hit_eof = true;
-			raw_buf_ptr = (IsParallelCopy()) ? cstate->raw_buf_index : 0;
+			if (IsParallelCopy())
+			{
+				if (!GetWorkerBlockPos(cstate, &copy_raw_buf))
+					hit_eof = true;
+			}
+			else
+			{
+				/*
+				 * Try to read some more data.  This will certainly reset
+				 * raw_buf_index to zero, and raw_buf_ptr must go with it.
+				 */
+				if (!CopyLoadRawBuf(cstate))
+					hit_eof = true;
+			}
+
+			raw_buf_ptr = 0;
 			copy_buf_len = cstate->raw_buf_len;
 
 			/*
@@ -1139,12 +1141,15 @@ CopyReadLineText(CopyFromState cstate)
 				 */
 				if (prev_raw_ptr > cstate->raw_buf_index)
 				{
-					if (!IsParallelCopy())
-						appendBinaryStringInfo(&cstate->line_buf,
-											   cstate->raw_buf + cstate->raw_buf_index,
-											   prev_raw_ptr - cstate->raw_buf_index);
-					else
-						line_size += prev_raw_ptr - cstate->raw_buf_index;
+					appendBinaryStringInfo(&cstate->line_buf,
+										   cstate->raw_buf + cstate->raw_buf_index,
+										   prev_raw_ptr - cstate->raw_buf_index);
+					if (IsParallelCopy())
+					{
+						ParallelCopyDataBlock *curr_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[cstate->pcdata->first_blk_info.current_block];
+						curr_data_blk_ptr->skip_bytes += raw_buf_ptr - cstate->raw_buf_index;
+						cstate->pcdata->first_blk_info.start_offset += prev_raw_ptr - cstate->raw_buf_index;
+					}
 				}
 
 				cstate->raw_buf_index = raw_buf_ptr;
@@ -1199,20 +1204,6 @@ not_end_of_copy:
 			raw_buf_ptr += mblen - 1;
 		}
 
-		/*
-		 * Skip the header line. Update the line here, this cannot be done at
-		 * the beginning, as there is a possibility that file contains empty
-		 * lines.
-		 */
-		if (IsParallelCopy() && first_char_in_line && !IsHeaderLine())
-		{
-			ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info;
-
-			line_pos = UpdateSharedLineInfo(cstate,
-											pcshared_info->cur_block_pos,
-											cstate->raw_buf_index, -1,
-											LINE_LEADER_POPULATING, -1);
-		}
 
 		first_char_in_line = false;
 	}							/* end of outer loop */
@@ -1223,15 +1214,15 @@ not_end_of_copy:
 	REFILL_LINEBUF;
 	if (!result && !IsHeaderLine())
 	{
-		if (IsParallelCopy())
-			ClearEOLFromCopiedData(cstate, cstate->raw_buf, raw_buf_ptr,
-								   &line_size);
-		else
+		//if (IsParallelCopy())
+		//	ClearEOLFromCopiedData(cstate, cstate->raw_buf, raw_buf_ptr,
+							//	   &line_size);
+		//else
 			ClearEOLFromCopiedData(cstate, cstate->line_buf.data,
 								   cstate->line_buf.len, &cstate->line_buf.len);
 	}
 
-	EndLineParallelCopy(cstate, line_pos, line_size, raw_buf_ptr);
+	//EndLineParallelCopy(cstate, line_pos, line_size, raw_buf_ptr);
 	return result;
 }
 
diff --git a/src/backend/commands/copyparallel.c b/src/backend/commands/copyparallel.c
index 5149798..d9704a1 100644
--- a/src/backend/commands/copyparallel.c
+++ b/src/backend/commands/copyparallel.c
@@ -83,6 +83,9 @@
 	ResetLatch(MyLatch); \
 }
 
+static void SetWorkerBlockPos(CopyFromState cstate, uint32 first_block,
+							  uint32 start_offset, uint64 cur_lineno);
+
 /*
  * Estimate1ByteStrSize
  *
@@ -383,7 +386,15 @@ PopulateParallelCopyShmInfo(ParallelCopyShmInfo *shared_info_ptr)
 		ParallelCopyLineBoundary *lineInfo = &shared_info_ptr->line_boundaries.ring[count];
 
 		pg_atomic_init_u32(&(lineInfo->line_size), -1);
+		lineInfo->first_block = 0;
+		lineInfo->start_offset = 0;
 	}
+
+	shared_info_ptr->line_boundaries.ring[0].first_block = 0;
+	shared_info_ptr->line_boundaries.ring[0].current_block = 0;
+	shared_info_ptr->line_boundaries.ring[0].start_offset = COPY_BLOCK_RESERVE_BYTES;
+	shared_info_ptr->line_boundaries.ring[0].cur_lineno = 1;
+	shared_info_ptr->line_boundaries.pos_filled = true;
 }
 
 /*
@@ -445,6 +456,8 @@ CheckExprParallelSafety(CopyFromState cstate)
 bool
 IsParallelCopyAllowed(CopyFromState cstate, Oid relid)
 {
+	if (cstate->opts.binary)
+		return false;
 	/*
 	 * Check if parallel operation can be performed based on local
 	 * table/foreign table/index/check constraints/triggers present for the
@@ -612,10 +625,14 @@ InitializeParallelCopyInfo(CopyFromState cstate, List *attnamelist)
 	cstate->reached_eof = false;
 	cstate->eol_type = EOL_UNKNOWN;
 	cstate->cur_relname = RelationGetRelationName(cstate->rel);
-	cstate->cur_lineno = 0;
+	cstate->cur_lineno = 1;
 	cstate->cur_attname = NULL;
 	cstate->cur_attval = NULL;
-	cstate->pcdata->curr_data_block = NULL;
+	pcdata->curr_data_block = NULL;
+	pcdata->first_blk_info.first_block = -1;
+	pcdata->first_blk_info.current_block = -1;
+	pcdata->first_blk_info.start_offset = 0;
+	pcdata->first_blk_info.cur_lineno = 1;
 
 	/* Set up variables to avoid per-attribute overhead. */
 	initStringInfo(&cstate->attribute_buf);
@@ -641,167 +658,32 @@ InitializeParallelCopyInfo(CopyFromState cstate, List *attnamelist)
 }
 
 /*
- * UpdateLineInfo
- *
- * Update line information & return.
- */
-static bool
-UpdateLineInfo(ParallelCopyShmInfo *pcshared_info,
-			   ParallelCopyLineBoundary *lineInfo, uint32 write_pos)
-{
-	elog(DEBUG1, "[Worker] Completed processing line:%u", write_pos);
-	pg_atomic_write_u32(&lineInfo->line_state, LINE_WORKER_PROCESSED);
-	pg_atomic_write_u32(&lineInfo->line_size, -1);
-	pg_atomic_add_fetch_u64(&pcshared_info->total_worker_processed, 1);
-	return false;
-}
-
-/*
  * CacheLineInfo
  *
  * Cache the line information to local memory.
  */
 static bool
-CacheLineInfo(CopyFromState cstate, uint32 buff_count, uint32 line_pos)
+CacheLineInfo(CopyFromState cstate, uint32 line_pos)
 {
-	ParallelCopyShmInfo *pcshared_info = cstate->pcdata->pcshared_info;
+	bool done;
 	ParallelCopyData *pcdata = cstate->pcdata;
-	uint32		write_pos;
-	ParallelCopyDataBlock *data_blk_ptr;
-	ParallelCopyLineBoundary *lineInfo;
-	uint32		offset;
-	uint32		dataSize;
-	uint32		copiedSize = 0;
-
-	resetStringInfo(&pcdata->worker_line_buf[buff_count].line_buf);
-	write_pos = GetLinePosition(cstate, line_pos);
-	if (-1 == write_pos)
-		return true;
-
-	/* Get the current line information. */
-	lineInfo = &pcshared_info->line_boundaries.ring[write_pos];
-	if (pg_atomic_read_u32(&lineInfo->line_size) == 0)
-		return UpdateLineInfo(pcshared_info, lineInfo, write_pos);
-
-	/* Get the block information. */
-	data_blk_ptr = &pcshared_info->data_blocks[lineInfo->first_block];
-
-	/* Get the offset information from where the data must be copied. */
-	offset = lineInfo->start_offset;
-	pcdata->worker_line_buf[buff_count].cur_lineno = lineInfo->cur_lineno;
-
-	elog(DEBUG1, "[Worker] Processing - line position:%u, block:%u, unprocessed lines:%u, offset:%u, line size:%u",
-		 write_pos, lineInfo->first_block,
-		 pg_atomic_read_u32(&data_blk_ptr->unprocessed_line_parts),
-		 offset, pg_atomic_read_u32(&lineInfo->line_size));
-
-	for (;;)
-	{
-		uint8		skip_bytes = data_blk_ptr->skip_bytes;
-
-		/*
-		 * There is a possibility that the loop embedded at the bottom of the
-		 * current loop has come out because data_blk_ptr->curr_blk_completed
-		 * is set, but dataSize read might be an old value, if
-		 * data_blk_ptr->curr_blk_completed and the line is completed,
-		 * line_size will be set. Read the line_size again to be sure if it is
-		 * completed or partial block.
-		 */
-		dataSize = pg_atomic_read_u32(&lineInfo->line_size);
-		if (dataSize != -1)
-		{
-			uint32		remainingSize = dataSize - copiedSize;
-
-			if (!remainingSize)
-				break;
-
-			/* Whole line is in current block. */
-			if (remainingSize + offset + skip_bytes < DATA_BLOCK_SIZE)
-			{
-				appendBinaryStringInfo(&pcdata->worker_line_buf[buff_count].line_buf,
-									   &data_blk_ptr->data[offset],
-									   remainingSize);
-				pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts,
-										1);
-				break;
-			}
-			else
-			{
-				/* Line is spread across the blocks. */
-				uint32		lineInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset;
-
-				appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf,
-										 &data_blk_ptr->data[offset],
-										 lineInCurrentBlock);
-				pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, 1);
-				copiedSize += lineInCurrentBlock;
-				while (copiedSize < dataSize)
-				{
-					uint32		currentBlockCopySize;
-					ParallelCopyDataBlock *currBlkPtr = &pcshared_info->data_blocks[data_blk_ptr->following_block];
-
-					skip_bytes = currBlkPtr->skip_bytes;
-
-					/*
-					 * If complete data is present in current block use
-					 * dataSize - copiedSize, or copy the whole block from
-					 * current block.
-					 */
-					currentBlockCopySize = Min(dataSize - copiedSize, DATA_BLOCK_SIZE - skip_bytes);
-					appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf,
-											 &currBlkPtr->data[0],
-											 currentBlockCopySize);
-					pg_atomic_sub_fetch_u32(&currBlkPtr->unprocessed_line_parts, 1);
-					copiedSize += currentBlockCopySize;
-					data_blk_ptr = currBlkPtr;
-				}
+	cstate->line_buf = pcdata->worker_line_buf[line_pos].line_buf;
 
-				break;
-			}
-		}
-		else if (data_blk_ptr->curr_blk_completed)
-		{
-			/* Copy this complete block from the current offset. */
-			uint32		lineInCurrentBlock = (DATA_BLOCK_SIZE - skip_bytes) - offset;
-
-			appendBinaryStringInfoNT(&pcdata->worker_line_buf[buff_count].line_buf,
-									 &data_blk_ptr->data[offset],
-									 lineInCurrentBlock);
-			pg_atomic_sub_fetch_u32(&data_blk_ptr->unprocessed_line_parts, 1);
-			copiedSize += lineInCurrentBlock;
-
-			/*
-			 * Reset the offset. For the first copy, copy from the offset. For
-			 * the subsequent copy the complete block.
-			 */
-			offset = 0;
-
-			/* Set data_blk_ptr to the following block. */
-			data_blk_ptr = &pcshared_info->data_blocks[data_blk_ptr->following_block];
-		}
+	/* Actually read the line into memory here */
+	done = CopyReadLine(cstate);
 
-		for (;;)
-		{
-			/* Get the size of this line */
-			dataSize = pg_atomic_read_u32(&lineInfo->line_size);
+	pcdata->worker_line_buf[line_pos].line_buf = cstate->line_buf;
+	pcdata->worker_line_buf[line_pos].cur_lineno = cstate->cur_lineno;
 
-			/*
-			 * If the data is present in current block lineInfo->line_size
-			 * will be updated. If the data is spread across the blocks either
-			 * of lineInfo->line_size or data_blk_ptr->curr_blk_completed can
-			 * be updated. lineInfo->line_size will be updated if the complete
-			 * read is finished. data_blk_ptr->curr_blk_completed will be
-			 * updated if processing of current block is finished and data
-			 * processing is not finished.
-			 */
-			if (data_blk_ptr->curr_blk_completed || (dataSize != -1))
-				break;
-
-			COPY_WAIT_TO_PROCESS()
-		}
-	}
+	/*
+	 * EOF at start of line means we're done.  If we see EOF after some
+	 * characters, we act as though it was newline followed by EOF, ie,
+	 * process the line and then exit loop on next iteration.
+	 */
+	if (done && cstate->line_buf.len == 0)
+		return false;
 
-	return UpdateLineInfo(pcshared_info, lineInfo, write_pos);
+	return true;
 }
 
 /*
@@ -835,10 +717,9 @@ GetCachedLine(CopyFromState cstate, ParallelCopyData *pcdata)
 bool
 GetWorkerLine(CopyFromState cstate)
 {
-	uint32		buff_count;
 	ParallelCopyData *pcdata = cstate->pcdata;
-	ParallelCopyLineBoundaries *line_boundaries = &pcdata->pcshared_info->line_boundaries;
-	uint32 worker_pos;
+	uint32 first_block = pcdata->first_blk_info.first_block;
+	uint32 current_block = pcdata->first_blk_info.current_block;
 
 	/*
 	 * Copy the line data to line_buf and release the line position so that
@@ -850,22 +731,32 @@ GetWorkerLine(CopyFromState cstate)
 	pcdata->worker_line_buf_pos = 0;
 	pcdata->worker_line_buf_count = 0;
 
-	SpinLockAcquire(&line_boundaries->worker_pos_lock);
-	worker_pos = line_boundaries->worker_pos;
-	line_boundaries->worker_pos = (line_boundaries->worker_pos +
-								   WORKER_CHUNK_COUNT) % RINGSIZE;
-	SpinLockRelease(&line_boundaries->worker_pos_lock);
-
-	for (buff_count = 0; buff_count < WORKER_CHUNK_COUNT; buff_count++)
+	/* While worker has not switched to next block or if the buffer lines are not filled */
+	while (first_block == current_block &&
+		   pcdata->worker_line_buf_count < WORKER_CHUNK_COUNT)
 	{
-		bool result = CacheLineInfo(cstate, buff_count,
-									worker_pos + buff_count);
-		if (result)
+		bool result = CacheLineInfo(cstate, pcdata->worker_line_buf_count);
+		if (!result)
 			break;
 
+		if (cstate->cur_lineno == 1 && cstate->opts.header_line)
+		{
+			/* on input just throw the header line away */
+			continue;
+		}
+
+		cstate->cur_lineno++;
 		pcdata->worker_line_buf_count++;
 	}
 
+	if (pcdata->first_blk_info.current_block != -1)
+		SetWorkerBlockPos(cstate, pcdata->first_blk_info.current_block,
+						cstate->pcdata->first_blk_info.start_offset,
+						cstate->cur_lineno);
+	cstate->raw_buf = 0;
+	cstate->raw_buf_index = 0;
+	cstate->raw_buf_len = 0;
+
 	if (pcdata->worker_line_buf_count > 0)
 		return GetCachedLine(cstate, pcdata);
 	else
@@ -1053,17 +944,7 @@ ParallelCopyFrom(CopyFromState cstate)
 
 	if (!cstate->opts.binary)
 	{
-		/* On input just throw the header line away. */
-		if (cstate->cur_lineno == 0 && cstate->opts.header_line)
-		{
-			cstate->cur_lineno++;
-			if (CopyReadLine(cstate))
-			{
-				pcshared_info->is_read_in_progress = false;
-				return;				/* done */
-			}
-		}
-
+		/* Get data block by block until read is completed. */
 		for (;;)
 		{
 			bool		done;
@@ -1073,14 +954,14 @@ ParallelCopyFrom(CopyFromState cstate)
 			cstate->cur_lineno++;
 
 			/* Actually read the line into memory here. */
-			done = CopyReadLine(cstate);
+			done = ParallelCopyGetData(cstate);
 
 			/*
 			 * EOF at start of line means we're done.  If we see EOF after
 			 * some characters, we act as though it was newline followed by
 			 * EOF, ie, process the line and then exit loop on next iteration.
 			 */
-			if (done && cstate->line_buf.len == 0)
+			if (done)
 				break;
 		}
 	}
@@ -1489,11 +1370,8 @@ GetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info)
 	while (count < (MAX_BLOCKS_COUNT - 1))
 	{
 		ParallelCopyDataBlock *dataBlkPtr = &pcshared_info->data_blocks[block_pos];
-		uint32		unprocessed_line_parts = pg_atomic_read_u32(&dataBlkPtr->unprocessed_line_parts);
-
-		if (unprocessed_line_parts == 0)
+		if (pg_atomic_read_u32(&dataBlkPtr->data_blk_state) == FREE)
 		{
-			dataBlkPtr->curr_blk_completed = false;
 			dataBlkPtr->skip_bytes = 0;
 			dataBlkPtr->following_block = -1;
 			pcshared_info->cur_block_pos = block_pos;
@@ -1536,8 +1414,7 @@ WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info)
  * Set raw_buf to the shared memory where the file data must be read.
  */
 void
-SetRawBufForLoad(CopyFromState cstate, uint32 line_size, uint32 copy_buf_len,
-				 uint32 raw_buf_ptr, char **copy_raw_buf)
+SetRawBufForLoad(CopyFromState cstate)
 {
 	ParallelCopyShmInfo *pcshared_info;
 	uint32		cur_block_pos;
@@ -1554,33 +1431,199 @@ SetRawBufForLoad(CopyFromState cstate, uint32 line_size, uint32 copy_buf_len,
 	next_data_blk_ptr = &pcshared_info->data_blocks[next_block_pos];
 
 	/* set raw_buf to the data block in shared memory */
-	cstate->raw_buf = next_data_blk_ptr->data;
-	*copy_raw_buf = cstate->raw_buf;
+	cstate->raw_buf = &next_data_blk_ptr->data[COPY_BLOCK_RESERVE_BYTES];
 	if (cur_data_blk_ptr)
 	{
-		if (line_size)
+		cur_data_blk_ptr->following_block = next_block_pos;
+		pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, FILLED);
+	}
+
+	cstate->raw_buf_len = 0;
+	cstate->raw_buf_index = 0;
+}
+
+/*
+ * Read the next input line and stash it in line_buf, with conversion to
+ * server encoding.
+ *
+ * Result is true if read was terminated by EOF, false if terminated
+ * by newline.  The terminating newline or EOF marker is not included
+ * in the final value of line_buf.
+ */
+bool
+ParallelCopyGetData(CopyFromState cstate)
+{
+	bool		hit_eof = false;
+	uint32		cur_block_pos;
+	ParallelCopyDataBlock *cur_data_blk_ptr;
+
+	SetRawBufForLoad(cstate);
+	cur_block_pos = cstate->pcdata->pcshared_info->cur_block_pos;
+	cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[cur_block_pos];
+
+	/*
+	 * Try to read some more data.  This will certainly reset
+	 * raw_buf_index to zero, and raw_buf_ptr must go with it.
+	 */
+	if (!CopyLoadRawBuf(cstate))
+		hit_eof = true;
+
+	/*
+	 * If we are completely out of data, break out of the loop,
+	 * reporting EOF.
+	 */
+	if (RAW_BUF_BYTES(cstate) <= 0)
+		return true;
+
+	cur_data_blk_ptr->skip_bytes = DATA_BLOCK_SIZE - cstate->raw_buf_len;
+	pg_atomic_add_fetch_u32(&cstate->pcdata->pcshared_info->total_populated, 1);
+
+	if (hit_eof)
+	{
+		if (cur_data_blk_ptr)
+		{
+			cur_data_blk_ptr->following_block = -1;
+			pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, FILLED);
+		}
+	}
+
+	return hit_eof;
+}
+
+bool
+GetWorkerBlockPos(CopyFromState cstate, char **copy_raw_buf)
+{
+	ParallelCopyDataBlock *cur_data_blk_ptr;
+	ParallelCopyLineBoundary *first_blk_info = &cstate->pcdata->first_blk_info;
+	uint32		start_offset;
+	int			nbytes = RAW_BUF_BYTES(cstate);
+	bool		bret = true;
+
+	/* Worker should move to the next block, first data block information is already available */
+	if (first_blk_info->first_block != -1)
+	{
+		ParallelCopyDataBlock *prev_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[first_blk_info->current_block];
+		pg_atomic_add_fetch_u32(&cstate->pcdata->pcshared_info->total_processed, 1);
+		pg_atomic_write_u32(&prev_data_blk_ptr->data_blk_state, FREE);
+		start_offset = first_blk_info->start_offset = COPY_BLOCK_RESERVE_BYTES;
+
+		if (prev_data_blk_ptr->following_block != -1)
+		{
+			/* The next block that should be processed will be set in the
+			 * following_block.
+			 */
+			cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[prev_data_blk_ptr->following_block];
+			first_blk_info->current_block = prev_data_blk_ptr->following_block;
+		}
+		else
 		{
 			/*
-			 * Mark the previous block as completed, worker can start copying
-			 * this data.
+			 * Last block was the last block, this will happen if worker could
+			 * not decide if it was EOL from the last few bytes of last block.
+			 * We can get a free block as worker will not be doing any work at
+			 * this time.
 			 */
-			cur_data_blk_ptr->following_block = next_block_pos;
-			pg_atomic_add_fetch_u32(&cur_data_blk_ptr->unprocessed_line_parts, 1);
-			cur_data_blk_ptr->curr_blk_completed = true;
+			uint32 next_block;
+			next_block = WaitGetFreeCopyBlock(cstate->pcdata->pcshared_info);
+			cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[next_block];
+			pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, FILLED);
+
+			first_blk_info->current_block = next_block;
+			cur_data_blk_ptr->skip_bytes = DATA_BLOCK_SIZE;
+		}
+
+		if (!cstate->pcdata->pcshared_info->is_read_in_progress &&
+			(pg_atomic_read_u32(&cstate->pcdata->pcshared_info->total_processed) ==
+			 pg_atomic_read_u32(&cstate->pcdata->pcshared_info->total_populated)))
+		{
+			if (nbytes)
+				bret = false;
+			else
+				return true;
 		}
+	}
+	else
+	{
+		/* Get the block for the worker to process. */
+		for(;;)
+		{
+			COPY_WAIT_TO_PROCESS();
+
+			if (!cstate->pcdata->pcshared_info->is_read_in_progress &&
+				(pg_atomic_read_u32(&cstate->pcdata->pcshared_info->total_processed) ==
+				 pg_atomic_read_u32(&cstate->pcdata->pcshared_info->total_populated)))
+				return true;
 
-		cur_data_blk_ptr->skip_bytes = copy_buf_len - raw_buf_ptr;
-		cstate->raw_buf_len = cur_data_blk_ptr->skip_bytes;
+			SpinLockAcquire(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock);
+			if (cstate->pcdata->pcshared_info->line_boundaries.pos_filled == false)
+			{
+				SpinLockRelease(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock);
+				continue;
+			}
 
-		/* Copy the skip bytes to the next block to be processed. */
-		if (cur_data_blk_ptr->skip_bytes)
-			memcpy(cstate->raw_buf, cur_data_blk_ptr->data + raw_buf_ptr,
-				   cur_data_blk_ptr->skip_bytes);
+			first_blk_info->first_block = cstate->pcdata->pcshared_info->line_boundaries.ring[0].first_block;
+			first_blk_info->current_block = cstate->pcdata->pcshared_info->line_boundaries.ring[0].first_block;
+			start_offset = first_blk_info->start_offset = cstate->pcdata->pcshared_info->line_boundaries.ring[0].start_offset;
+			cstate->cur_lineno = first_blk_info->cur_lineno = cstate->pcdata->pcshared_info->line_boundaries.ring[0].cur_lineno;
+			cstate->pcdata->pcshared_info->line_boundaries.pos_filled = false;
+			SpinLockRelease(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock);
+			break;
+		}
+
+		cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[first_blk_info->first_block];
+	}
+
+	/* Wait till the block is populated by leader */
+	while (pg_atomic_read_u32(&cur_data_blk_ptr->data_blk_state) != FILLED)
+		COPY_WAIT_TO_PROCESS();
+
+	pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, PROCESSING);
+
+	/* set cstate variables */
+	if (nbytes > 0)
+	{
+		char *raw_buf = &cur_data_blk_ptr->data[start_offset];
+
+		/* Copy down the unprocessed data if any. */
+		memmove(raw_buf, cstate->raw_buf + cstate->raw_buf_index, nbytes);
+
+		cstate->raw_buf = raw_buf;
+		cstate->raw_buf_len = DATA_BLOCK_SIZE - cur_data_blk_ptr->skip_bytes + nbytes;
 	}
 	else
-		cstate->raw_buf_len = 0;
+	{
+		cstate->raw_buf = &cur_data_blk_ptr->data[start_offset];
+		cstate->raw_buf_len = DATA_BLOCK_SIZE - cur_data_blk_ptr->skip_bytes;
+	}
 
 	cstate->raw_buf_index = 0;
+	*copy_raw_buf = cstate->raw_buf;
+	return bret;
+}
+
+static void
+SetWorkerBlockPos(CopyFromState cstate, uint32 first_block,
+				  uint32 start_offset, uint64 cur_lineno)
+{
+	ParallelCopyDataBlock *cur_data_blk_ptr;
+	cur_data_blk_ptr = &cstate->pcdata->pcshared_info->data_blocks[first_block];
+	pg_atomic_write_u32(&cur_data_blk_ptr->data_blk_state, FILLED);
+
+	SpinLockAcquire(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock);
+	cstate->pcdata->pcshared_info->line_boundaries.ring[0].first_block = first_block;
+	cstate->pcdata->pcshared_info->line_boundaries.ring[0].current_block = first_block;
+	cstate->pcdata->pcshared_info->line_boundaries.ring[0].start_offset = start_offset;
+	cstate->pcdata->pcshared_info->line_boundaries.ring[0].cur_lineno = cur_lineno;
+	cstate->pcdata->pcshared_info->line_boundaries.pos_filled = true;
+	SpinLockRelease(&cstate->pcdata->pcshared_info->line_boundaries.worker_pos_lock);
+
+	cstate->pcdata->first_blk_info.first_block = -1;
+	cstate->pcdata->first_blk_info.current_block = -1;
+	cstate->pcdata->first_blk_info.start_offset = 0;
+	cstate->pcdata->first_blk_info.cur_lineno = 1;
+
+	//elog(LOG, "Setting first_block to %d", first_block);
+	//elog(LOG, "Setting offset to %d", start_offset);
 }
 
 /*
diff --git a/src/include/commands/copyfrom_internal.h b/src/include/commands/copyfrom_internal.h
index 67cf775..40105b0 100644
--- a/src/include/commands/copyfrom_internal.h
+++ b/src/include/commands/copyfrom_internal.h
@@ -43,7 +43,7 @@
  * should be a multiple of WORKER_CHUNK_COUNT, as wrap around cases is currently
  * not handled while selecting the WORKER_CHUNK_COUNT by the worker.
  */
-#define RINGSIZE (10 * 1024)
+#define RINGSIZE 1
 
 /*
  * While accessing DSM, each worker will pick the WORKER_CHUNK_COUNT records
@@ -205,6 +205,17 @@ typedef enum CopyInsertMethod
 	CIM_MULTI_CONDITIONAL		/* use table_multi_insert only if valid */
 } CopyInsertMethod;
 
+#define COPY_BLOCK_RESERVE_BYTES 8
+
+typedef enum BlockState
+{
+   FREE,       /* buffer is empty */
+   FILLED,     /* leader has filled the buffer with raw data */
+   READY,      /* start pos has been filled in, but no worker process has claimed the block yet */
+   PROCESSING, /* worker has claimed the block, and is processing it */
+
+}			BlockState;
+
 /*
  * Copy data block information.
  *
@@ -234,6 +245,9 @@ typedef struct ParallelCopyDataBlock
 	 */
 	bool		curr_blk_completed;
 
+	/* Current state of the block */
+	pg_atomic_uint32 data_blk_state;
+
 	/*
 	 * Few bytes need to be skipped from this block, this will be set when a
 	 * sequence of characters like \r\n is expected, but end of our block
@@ -242,8 +256,8 @@ typedef struct ParallelCopyDataBlock
 	 * Worker will use skip_bytes to know that this data must be skipped from
 	 * this data block.
 	 */
-	uint8		skip_bytes;
-	char		data[DATA_BLOCK_SIZE];	/* data read from file */
+	uint32		skip_bytes;
+	char		data[COPY_BLOCK_RESERVE_BYTES + DATA_BLOCK_SIZE];	/* data read from file */
 } ParallelCopyDataBlock;
 
 /*
@@ -283,6 +297,7 @@ typedef struct ParallelCopyLineBoundary
 	/* Position of the first block in data_blocks array. */
 	uint32		first_block;
 	uint32		start_offset;	/* start offset of the line */
+	uint32		current_block;
 
 	/*
 	 * Size of the current line -1 means line is yet to be filled completely,
@@ -303,6 +318,12 @@ typedef struct ParallelCopyLineBoundaries
 	uint32		worker_pos;			/* Worker's last blocked Position. */
 	slock_t		worker_pos_lock;	/* locks worker_pos shared variable. */
 
+	/*
+	 * If one of the worker has filled ring variable, to indicate if any of the
+	 * worker should pick or wait till it is filled.
+	 */
+	bool		pos_filled;
+
 	/* Data read from the file/stdin by the leader process. */
 	ParallelCopyLineBoundary ring[RINGSIZE];
 } ParallelCopyLineBoundaries;
@@ -328,6 +349,9 @@ typedef struct ParallelCopyShmInfo
 	 * clause.
 	 */
 	pg_atomic_uint64 total_worker_processed;
+
+	pg_atomic_uint32 total_populated;
+	pg_atomic_uint32 total_processed;
 	uint64		populated;		/* lines populated by leader */
 	uint32		cur_block_pos;	/* current data block */
 	ParallelCopyDataBlock data_blocks[MAX_BLOCKS_COUNT];	/* data block array */
@@ -394,6 +418,8 @@ typedef struct ParallelCopyData
 	/* Current position in worker_line_buf */
 	uint32		worker_line_buf_pos;
 
+	ParallelCopyLineBoundary  first_blk_info;
+
 	/* For binary formatted files */
 	ParallelCopyDataBlock *curr_data_block;
 } ParallelCopyData;
@@ -528,12 +554,14 @@ extern uint32 GetLinePosition(CopyFromState cstate, uint32 line_pos);
 extern bool GetWorkerLine(CopyFromState cstate);
 extern bool CopyReadLine(CopyFromState cstate);
 extern uint32 WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info);
-extern void SetRawBufForLoad(CopyFromState cstate, uint32 line_size, uint32 copy_buf_len,
-							 uint32 raw_buf_ptr, char **copy_raw_buf);
+extern void SetRawBufForLoad(CopyFromState cstate);
 extern uint32 UpdateSharedLineInfo(CopyFromState cstate, uint32 blk_pos, uint32 offset,
 								   uint32 line_size, uint32 line_state, uint32 blk_line_pos);
 extern void EndLineParallelCopy(CopyFromState cstate, uint32 line_pos, uint32 line_size,
 								uint32 raw_buf_ptr);
+extern bool ParallelCopyGetData(CopyFromState cstate);
+extern bool GetWorkerBlockPos(CopyFromState cstate, char **copy_raw_buf);
+extern bool CopyLoadRawBuf(CopyFromState cstate);
 
 extern int	CopyGetData(CopyFromState cstate, void *databuf, int minread, int maxread);
 extern int	CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes);
diff --git a/src/test/regress/expected/copy2.out b/src/test/regress/expected/copy2.out
index 22274cb..d87432a 100644
--- a/src/test/regress/expected/copy2.out
+++ b/src/test/regress/expected/copy2.out
@@ -767,7 +767,7 @@ ERROR:  column "d" specified more than once
 -- missing data: should fail
 COPY test_parallel_copy FROM stdin WITH (PARALLEL 1);
 ERROR:  invalid input syntax for type integer: ""
-CONTEXT:  COPY test_parallel_copy, line 0, column a: ""
+CONTEXT:  COPY test_parallel_copy, line 1, column a: ""
 parallel worker
 COPY test_parallel_copy FROM stdin WITH (PARALLEL 1);
 ERROR:  missing data for column "e"
-- 
1.8.3.1

Reply via email to