From 28c5b37c2271b623f6bc4653d17f92dedb8722be Mon Sep 17 00:00:00 2001
From: Bharath Rupireddy <bharath.rupireddy@enterprisedb.com>
Date: Tue, 22 Sep 2020 15:12:27 +0530
Subject: [PATCH v2] Parallel Copy Exec Time Capture

A testing patch for capturing various timings such as total copy
time in leader and worker, index insertion time, leader and worker
waiting time.
---
 src/backend/commands/copy.c | 74 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 73 insertions(+), 1 deletion(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 5b1884acd8..cb72949e0e 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -65,6 +65,14 @@
 #define ISOCTAL(c) (((c) >= '0') && ((c) <= '7'))
 #define OCTVALUE(c) ((c) - '0')
 
+/* Global variables for capturing parallel copy execution times. */
+double totalcopytime;
+double totalcopytimeworker;
+double totalcopyleaderwaitingtime;
+double totalcopyworkerwaitingtime;
+double totaltableinsertiontime;
+double totalindexinsertiontime;
+
 /*
  * Represents the different source/dest cases we need to worry about at
  * the bottom level
@@ -1332,9 +1340,16 @@ CacheLineInfo(CopyState cstate, uint32 buff_count)
 	uint32 offset;
 	int dataSize;
 	int copiedSize = 0;
+	struct timespec before, after;
+	struct timespec before1, after1;
 
 	resetStringInfo(&pcdata->worker_line_buf[buff_count].line_buf);
+	INSTR_TIME_SET_CURRENT(before);
 	write_pos = GetLinePosition(cstate);
+	INSTR_TIME_SET_CURRENT(after);
+	INSTR_TIME_SUBTRACT(after, before);
+	totalcopyworkerwaitingtime += INSTR_TIME_GET_MILLISEC(after);
+
 	if (-1 == write_pos)
 		return true;
 
@@ -1436,6 +1451,7 @@ CacheLineInfo(CopyState cstate, uint32 buff_count)
 			data_blk_ptr = &pcshared_info->data_blocks[data_blk_ptr->following_block];
 		}
 
+		INSTR_TIME_SET_CURRENT(before1);
 		for (;;)
 		{
 			/* Get the size of this line */
@@ -1455,6 +1471,9 @@ CacheLineInfo(CopyState cstate, uint32 buff_count)
 
 			COPY_WAIT_TO_PROCESS()
 		}
+		INSTR_TIME_SET_CURRENT(after1);
+		INSTR_TIME_SUBTRACT(after1, before1);
+		totalcopyworkerwaitingtime += INSTR_TIME_GET_MILLISEC(after1);
 	}
 
 empty_data_line_update:
@@ -1538,6 +1557,11 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc)
 	char *convertListStr = NULL;
 	WalUsage   *walusage;
 	BufferUsage *bufferusage;
+	struct timespec before, after;
+	totalcopytimeworker = 0;
+	totalcopyworkerwaitingtime = 0;
+	totaltableinsertiontime = 0;
+	totalindexinsertiontime = 0;
 
 	/* Allocate workspace and zero all fields. */
 	cstate = (CopyStateData *) palloc0(sizeof(CopyStateData));
@@ -1606,7 +1630,15 @@ ParallelCopyMain(dsm_segment *seg, shm_toc *toc)
 	cstate->rel = rel;
 	InitializeParallelCopyInfo(shared_cstate, cstate, attlist);
 
+	INSTR_TIME_SET_CURRENT(before);
 	CopyFrom(cstate);
+	INSTR_TIME_SET_CURRENT(after);
+	INSTR_TIME_SUBTRACT(after, before);
+	totalcopytimeworker += INSTR_TIME_GET_MILLISEC(after);
+	ereport(LOG, (errmsg("totalcopyworkerwaitingtime = %.3f ms", totalcopyworkerwaitingtime), errhidestmt(true)));
+	ereport(LOG, (errmsg("totaltableinsertiontime = %.3f ms", totaltableinsertiontime), errhidestmt(true)));
+	ereport(LOG, (errmsg("totalindexinsertiontime = %.3f ms", totalindexinsertiontime), errhidestmt(true)));
+	ereport(LOG, (errmsg("totalcopytimeworker = %.3f ms", totalcopytimeworker), errhidestmt(true)));
 
 	if (rel != NULL)
 		table_close(rel, RowExclusiveLock);
@@ -1633,11 +1665,16 @@ UpdateBlockInLineInfo(CopyState cstate, uint32 blk_pos,
 	ParallelCopyLineBoundaries *lineBoundaryPtr = &pcshared_info->line_boundaries;
 	ParallelCopyLineBoundary *lineInfo;
 	int line_pos = lineBoundaryPtr->pos;
+	struct timespec before, after;
 
 	/* Update the line information for the worker to pick and process. */
 	lineInfo = &lineBoundaryPtr->ring[line_pos];
+	INSTR_TIME_SET_CURRENT(before);
 	while (pg_atomic_read_u32(&lineInfo->line_size) != -1)
 		COPY_WAIT_TO_PROCESS()
+	INSTR_TIME_SET_CURRENT(after);
+	INSTR_TIME_SUBTRACT(after, before);
+	totalcopyleaderwaitingtime += INSTR_TIME_GET_MILLISEC(after);
 
 	lineInfo->first_block = blk_pos;
 	lineInfo->start_offset = offset;
@@ -2203,6 +2240,8 @@ static uint32
 WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info)
 {
 	uint32 new_free_pos = -1;
+	struct timespec before, after;
+	INSTR_TIME_SET_CURRENT(before);
 	for (;;)
 	{
 		new_free_pos = GetFreeCopyBlock(pcshared_info);
@@ -2211,7 +2250,9 @@ WaitGetFreeCopyBlock(ParallelCopyShmInfo *pcshared_info)
 
 		COPY_WAIT_TO_PROCESS()
 	}
-
+	INSTR_TIME_SET_CURRENT(after);
+	INSTR_TIME_SUBTRACT(after, before);
+	totalcopyleaderwaitingtime += INSTR_TIME_GET_MILLISEC(after);
 	return new_free_pos;
 }
 
@@ -3083,12 +3124,21 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 	if (is_from)
 	{
 		ParallelContext *pcxt = NULL;
+		struct timespec before, after;
 		Assert(rel);
+		totalcopytime = 0;
+		totalcopytimeworker = 0;
+		totalcopyleaderwaitingtime = 0;
+		totalcopyworkerwaitingtime = 0;
+		totaltableinsertiontime = 0;
+		totalindexinsertiontime = 0;
 
 		/* check read-only transaction and parallel mode */
 		if (XactReadOnly && !rel->rd_islocaltemp)
 			PreventCommandIfReadOnly("COPY FROM");
 
+		INSTR_TIME_SET_CURRENT(before);
+
 		cstate = BeginCopyFrom(pstate, rel, stmt->filename, stmt->is_program,
 							   NULL, stmt->attlist, stmt->options);
 		cstate->whereClause = whereClause;
@@ -3119,6 +3169,18 @@ DoCopy(ParseState *pstate, const CopyStmt *stmt,
 		}
 
 		EndCopyFrom(cstate);
+
+		INSTR_TIME_SET_CURRENT(after);
+		INSTR_TIME_SUBTRACT(after, before);
+		totalcopytime += INSTR_TIME_GET_MILLISEC(after);
+		if (pcxt != NULL)
+			ereport(LOG, (errmsg("totalcopyleaderwaitingtime = %.3f ms", totalcopyleaderwaitingtime), errhidestmt(true)));
+		if (pcxt == NULL)
+		{
+			ereport(LOG, (errmsg("totaltableinsertiontime = %.3f ms", totaltableinsertiontime), errhidestmt(true)));
+			ereport(LOG, (errmsg("totalindexinsertiontime = %.3f ms", totalindexinsertiontime), errhidestmt(true)));
+		}
+		ereport(LOG, (errmsg("totalcopytime = %.3f ms", totalcopytime), errhidestmt(true)));
 	}
 	else
 	{
@@ -4527,6 +4589,8 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	int			nused = buffer->nused;
 	ResultRelInfo *resultRelInfo = buffer->resultRelInfo;
 	TupleTableSlot **slots = buffer->slots;
+	struct timespec before, after;
+	struct timespec before1, after1;
 
 	/* Set es_result_relation_info to the ResultRelInfo we're flushing. */
 	estate->es_result_relation_info = resultRelInfo;
@@ -4543,14 +4607,19 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 	 * context before calling it.
 	 */
 	oldcontext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
+	INSTR_TIME_SET_CURRENT(before);
 	table_multi_insert(resultRelInfo->ri_RelationDesc,
 					   slots,
 					   nused,
 					   mycid,
 					   ti_options,
 					   buffer->bistate);
+	INSTR_TIME_SET_CURRENT(after);
+	INSTR_TIME_SUBTRACT(after, before);
+	totaltableinsertiontime += INSTR_TIME_GET_MILLISEC(after);
 	MemoryContextSwitchTo(oldcontext);
 
+	INSTR_TIME_SET_CURRENT(before1);
 	for (i = 0; i < nused; i++)
 	{
 		/*
@@ -4586,6 +4655,9 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
 
 		ExecClearTuple(slots[i]);
 	}
+	INSTR_TIME_SET_CURRENT(after1);
+	INSTR_TIME_SUBTRACT(after1, before1);
+	totalindexinsertiontime += INSTR_TIME_GET_MILLISEC(after1);
 
 	/* Mark that all slots are free */
 	buffer->nused = 0;
-- 
2.25.1

