From f8ed06cfee691d9ba9602a93113b9a4debe97b5b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= <josef.simanek@gmail.com>
Date: Mon, 8 Jun 2020 02:00:53 +0200
Subject: [PATCH 1/2] Initial work on COPY progress.

---
 src/backend/commands/copy.c         | 13 ++++++++++---
 src/backend/utils/adt/pgstatfuncs.c |  2 ++
 src/include/commands/progress.h     |  3 +++
 src/include/pgstat.h                |  3 ++-
 4 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6d53dc463c18..6c66a1631a40 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -29,6 +29,7 @@
 #include "catalog/pg_type.h"
 #include "commands/copy.h"
 #include "commands/defrem.h"
+#include "commands/progress.h"
 #include "commands/trigger.h"
 #include "executor/execPartition.h"
 #include "executor/executor.h"
@@ -45,6 +46,7 @@
 #include "parser/parse_collate.h"
 #include "parser/parse_expr.h"
 #include "parser/parse_relation.h"
+#include "pgstat.h"
 #include "port/pg_bswap.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/fd.h"
@@ -1752,6 +1754,9 @@ BeginCopy(ParseState *pstate,
 
 	cstate->copy_dest = COPY_FILE;	/* default */
 
+	pgstat_progress_start_command(PROGRESS_COMMAND_COPY, queryRelId);
+	pgstat_progress_update_param(PROGRESS_COPY_PROCESSED,0);
+
 	MemoryContextSwitchTo(oldcontext);
 
 	return cstate;
@@ -1811,6 +1816,8 @@ EndCopy(CopyState cstate)
 							cstate->filename)));
 	}
 
+	pgstat_progress_end_command();
+
 	MemoryContextDelete(cstate->copycontext);
 	pfree(cstate);
 }
@@ -2123,7 +2130,7 @@ CopyTo(CopyState cstate)
 
 			/* Format and send the data */
 			CopyOneRowTo(cstate, slot);
-			processed++;
+			pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++processed);
 		}
 
 		ExecDropSingleTupleTableSlot(slot);
@@ -3262,7 +3269,7 @@ CopyFrom(CopyState cstate)
 			 * or FDW; this is the same definition used by nodeModifyTable.c
 			 * for counting tuples inserted by an INSERT command.
 			 */
-			processed++;
+			pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++processed);
 		}
 	}
 
@@ -5119,7 +5126,7 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 
 	/* Send the data */
 	CopyOneRowTo(cstate, slot);
-	myState->processed++;
+	pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++myState->processed);
 
 	return true;
 }
diff --git a/src/backend/utils/adt/pgstatfuncs.c b/src/backend/utils/adt/pgstatfuncs.c
index 2aff739466ff..b740eef7c102 100644
--- a/src/backend/utils/adt/pgstatfuncs.c
+++ b/src/backend/utils/adt/pgstatfuncs.c
@@ -494,6 +494,8 @@ pg_stat_get_progress_info(PG_FUNCTION_ARGS)
 		cmdtype = PROGRESS_COMMAND_CREATE_INDEX;
 	else if (pg_strcasecmp(cmd, "BASEBACKUP") == 0)
 		cmdtype = PROGRESS_COMMAND_BASEBACKUP;
+	else if (pg_strcasecmp(cmd, "COPY") == 0)
+		cmdtype = PROGRESS_COMMAND_COPY;
 	else
 		ereport(ERROR,
 				(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 36b073e67757..2d72f12a75b5 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -133,4 +133,7 @@
 #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE		4
 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL			5
 
+/* Commands of PROGRESS_CLUSTER */
+#define PROGRESS_COPY_PROCESSED    0
+
 #endif
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index c55dc1481ca5..45b8006ff2b8 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -994,7 +994,8 @@ typedef enum ProgressCommandType
 	PROGRESS_COMMAND_ANALYZE,
 	PROGRESS_COMMAND_CLUSTER,
 	PROGRESS_COMMAND_CREATE_INDEX,
-	PROGRESS_COMMAND_BASEBACKUP
+	PROGRESS_COMMAND_BASEBACKUP,
+	PROGRESS_COMMAND_COPY
 } ProgressCommandType;
 
 #define PGSTAT_NUM_PROGRESS_PARAM	20

From bb528a21dcf8d12b06961d8fd591d3ac3bbf2c5b Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= <josef.simanek@gmail.com>
Date: Sun, 14 Jun 2020 02:46:04 +0200
Subject: [PATCH 2/2] Enhance copy progress with more info. - add
 pg_stat_progress_copy system view

---
 src/backend/catalog/system_views.sql | 14 ++++++++++++++
 src/backend/commands/copy.c          | 16 +++++++++++-----
 src/include/commands/progress.h      |  8 ++++++--
 src/test/regress/expected/rules.out  | 15 +++++++++++++++
 4 files changed, 46 insertions(+), 7 deletions(-)

diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 56420bbc9d6f..05f995b43937 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1093,6 +1093,20 @@ CREATE VIEW pg_stat_progress_basebackup AS
 	S.param5 AS tablespaces_streamed
     FROM pg_stat_get_progress_info('BASEBACKUP') AS S;
 
+CREATE VIEW pg_stat_progress_copy AS
+    SELECT
+        S.pid AS pid, S.datid AS datid, D.datname AS datname,
+        S.relid AS relid,
+        CASE S.param1 WHEN 0 THEN 'TO'
+                      WHEN 1 THEN 'FROM'
+                      END as direction,
+        CAST (S.param2::integer AS bool) AS file,
+        CAST (S.param3::integer AS bool) AS program,
+        S.param4 AS lines_processed,
+        S.param5 AS file_bytes_processed
+    FROM pg_stat_get_progress_info('COPY') AS S
+        LEFT JOIN pg_database D ON S.datid = D.oid;
+
 CREATE VIEW pg_user_mappings AS
     SELECT
         U.oid       AS umid,
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 6c66a1631a40..3462e4f414d1 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -566,6 +566,7 @@ CopySendEndOfRow(CopyState cstate)
 							(errcode_for_file_access(),
 							 errmsg("could not write to COPY file: %m")));
 			}
+			pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, ftell(cstate->copy_file));
 			break;
 		case COPY_OLD_FE:
 			/* The FE/BE protocol uses \n as newline for all platforms */
@@ -618,6 +619,7 @@ CopyGetData(CopyState cstate, void *databuf, int minread, int maxread)
 	{
 		case COPY_FILE:
 			bytesread = fread(databuf, 1, maxread, cstate->copy_file);
+			pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, ftell(cstate->copy_file));
 			if (ferror(cstate->copy_file))
 				ereport(ERROR,
 						(errcode_for_file_access(),
@@ -1754,8 +1756,12 @@ BeginCopy(ParseState *pstate,
 
 	cstate->copy_dest = COPY_FILE;	/* default */
 
-	pgstat_progress_start_command(PROGRESS_COMMAND_COPY, queryRelId);
-	pgstat_progress_update_param(PROGRESS_COPY_PROCESSED,0);
+	pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+
+	pgstat_progress_update_param(PROGRESS_COPY_BYTES_PROCESSED, 0);
+	pgstat_progress_update_param(PROGRESS_COPY_IS_FROM, (int) cstate->is_copy_from);
+	pgstat_progress_update_param(PROGRESS_COPY_IS_FILE, (int) cstate->copy_dest == COPY_FILE);
+	pgstat_progress_update_param(PROGRESS_COPY_IS_PROGRAM, (int) cstate->is_program);
 
 	MemoryContextSwitchTo(oldcontext);
 
@@ -2130,7 +2136,7 @@ CopyTo(CopyState cstate)
 
 			/* Format and send the data */
 			CopyOneRowTo(cstate, slot);
-			pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++processed);
+			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
 		}
 
 		ExecDropSingleTupleTableSlot(slot);
@@ -3269,7 +3275,7 @@ CopyFrom(CopyState cstate)
 			 * or FDW; this is the same definition used by nodeModifyTable.c
 			 * for counting tuples inserted by an INSERT command.
 			 */
-			pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++processed);
+			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
 		}
 	}
 
@@ -5126,7 +5132,7 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 
 	/* Send the data */
 	CopyOneRowTo(cstate, slot);
-	pgstat_progress_update_param(PROGRESS_COPY_PROCESSED, ++myState->processed);
+	pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
 
 	return true;
 }
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 2d72f12a75b5..3947222e6f61 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -133,7 +133,11 @@
 #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE		4
 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL			5
 
-/* Commands of PROGRESS_CLUSTER */
-#define PROGRESS_COPY_PROCESSED    0
+/* Commands of PROGRESS_COPY */
+#define PROGRESS_COPY_IS_FROM         0
+#define PROGRESS_COPY_IS_FILE         1
+#define PROGRESS_COPY_IS_PROGRAM      2
+#define PROGRESS_COPY_LINES_PROCESSED 3
+#define PROGRESS_COPY_BYTES_PROCESSED 4
 
 #endif
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index b813e322153d..dac5da4e6dd9 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1925,6 +1925,21 @@ pg_stat_progress_cluster| SELECT s.pid,
     s.param8 AS index_rebuild_count
    FROM (pg_stat_get_progress_info('CLUSTER'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
+pg_stat_progress_copy| SELECT s.pid,
+    s.datid,
+    d.datname,
+    s.relid,
+        CASE s.param1
+            WHEN 0 THEN 'TO'::text
+            WHEN 1 THEN 'FROM'::text
+            ELSE NULL::text
+        END AS direction,
+    ((s.param2)::integer)::boolean AS file,
+    ((s.param3)::integer)::boolean AS program,
+    s.param4 AS lines_processed,
+    s.param5 AS file_bytes_processed
+   FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
+     LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,
     s.datid,
     d.datname,
