On Mon, 8 Mar 2021 at 09:24, Michael Paquier <mich...@paquier.xyz> wrote:
>
> On Sun, Mar 07, 2021 at 04:50:31PM +0530, Bharath Rupireddy wrote:
> > Attaching remaining patches 0001 and 0003 from the v11 patch
> > set(posted upthread) here to make cfbot happier.
>
> Looking at patch 0002, the location of each progress report looks good
> to me.  I have some issues with some of the names chosen though, so I
> would like to suggest a few changes to simplify things:
> - PROGRESS_COPY_IO_TYPE_* => PROGRESS_COPY_TYPE_*
> - PROGRESS_COPY_IO_TYPE => PROGRESS_COPY_TYPE
> - PROGRESS_COPY_TYPE_STDIO => PROGRESS_COPY_TYPE_PIPE
> - In pg_stat_progress_copy, io_type => type

Seems reasonable. PFA updated patches. I've renamed the previous 0003
to 0002 to keep git-format-patch easy.

> It seems a bit confusing to not count insertions on foreign tables
> where nothing happened.  I am fine to live with that, but can I ask if
> this has been thought about?

This is keeping current behaviour of the implementation as committed
with 8a4f618e, with the rationale of that patch being that this number
should mirror the number returned by the copy command.

I am not opposed to adding another column for `tuples_inserted` and
changing the logic accordingly (see prototype 0003), but that was not
in the intended scope of this patchset. Unless you think that this
should be included in this current patchset, I'll spin that patch out
into a different thread, but I'm not sure that would make it into
pg14.

With regards,

Matthias van de Meent.
From 94876abe0ab9c28a6f4b0ac006f356251ca4746c Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekew...@gmail.com>
Date: Mon, 8 Mar 2021 16:08:24 +0100
Subject: [PATCH v12 3/3] Adapt COPY progress reporting to report processed and
 inserted tuples

Previously, tuples_processed was implied to be the amount of tuples inserted
in COPY ... FROM. This code is now changed to make tuples_processed count
all tuples that were identified in the COPY ... FROM command, and make
tuples_inserted count all tuples that were inserted into the table (that is,
it excludes the tuples excluded using the WHERE clause, and those that were
not inserted due to triggers, or failure to insert into foreign tables).

This also renumbers the columns to be back in-order, before the stamping of
pg14 makes those numbers effectively immutable
---
 doc/src/sgml/monitoring.sgml         | 30 ++++++++++++++++++++++++
 src/backend/catalog/system_views.sql | 13 ++++++-----
 src/backend/commands/copyfrom.c      | 34 ++++++++++++++++++++++++----
 src/include/commands/progress.h      | 13 ++++++-----
 src/test/regress/expected/rules.out  | 13 ++++++-----
 src/test/regress/output/copy.source  |  4 ++--
 6 files changed, 83 insertions(+), 24 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 32bebc70db..aa2e15a748 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -6595,6 +6595,24 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
       </para>
       <para>
        Number of tuples already processed by <command>COPY</command> command.
+       For <command>COPY FROM</command> this includes all tuples from the
+       source material (including tuples excluded by the
+       <command>WHERE</command> clause of the <command>COPY</command>
+       command, and tuples that were not inserted as a result of trigger
+       behaviour). For <command>COPY TO</command> this includes all tuples
+       that exist in the table, or those that are returned by the
+       <command>SELECT</command>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>tuples_inserted</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of tuples inserted into the table with the
+       <command>COPY FROM</command> command. Is <literal>0</literal> for
+       <command>COPY TO</command>.
       </para></entry>
      </row>
 
@@ -6610,6 +6628,18 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
     </tbody>
    </tgroup>
   </table>
+  <note>
+   <para>
+    The <literal>tuples_excluded</literal> column does not count the tuples
+    that failed to insert, but only those tuples that were available in the
+    source material and not eligible for insertion through exclusion by the
+    <command>WHERE</command> clause. You can calculate the number of tuples
+    that failed to insert into the table due to triggers (or that otherwise
+    silently fail to insert) by subtracting both
+    <literal>tuples_excluded</literal> and <literal>tuples_inserted</literal>
+    from <literal>tuples_processed</literal> instead.
+   </para>
+  </note>
  </sect2>
 
  </sect1>
diff --git a/src/backend/catalog/system_views.sql 
b/src/backend/catalog/system_views.sql
index 8f9987f311..f59de36742 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1130,18 +1130,19 @@ CREATE VIEW pg_stat_progress_copy AS
     SELECT
         S.pid AS pid, S.datid AS datid, D.datname AS datname,
         S.relid AS relid,
-        CASE S.param5 WHEN 1 THEN 'COPY FROM'
+        CASE S.param1 WHEN 1 THEN 'COPY FROM'
                       WHEN 2 THEN 'COPY TO'
                       END AS command,
-        CASE S.param6 WHEN 1 THEN 'FILE'
+        CASE S.param2 WHEN 1 THEN 'FILE'
                       WHEN 2 THEN 'PROGRAM'
                       WHEN 3 THEN 'PIPE'
                       WHEN 4 THEN 'CALLBACK'
                       END AS "type",
-        S.param1 AS bytes_processed,
-        S.param2 AS bytes_total,
-        S.param3 AS tuples_processed,
-        S.param4 AS tuples_excluded
+        S.param3 AS bytes_processed,
+        S.param4 AS bytes_total,
+        S.param5 AS tuples_processed,
+        S.param6 AS tuples_inserted,
+        S.param7 AS tuples_excluded
     FROM pg_stat_get_progress_info('COPY') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index 8856bf215b..0aae5ae71b 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -540,6 +540,7 @@ CopyFrom(CopyFromState cstate)
        CopyInsertMethod insertMethod;
        CopyMultiInsertInfo multiInsertInfo = {0};      /* pacify compiler */
        int64           processed = 0;
+       int64           inserted = 0;
        int64           excluded = 0;
        bool            has_before_insert_row_trig;
        bool            has_instead_insert_row_trig;
@@ -854,6 +855,9 @@ CopyFrom(CopyFromState cstate)
                if (!NextCopyFrom(cstate, econtext, myslot->tts_values, 
myslot->tts_isnull))
                        break;
 
+               /* We're starting processing the next tuple, so update 
processed */
+               ++processed;
+
                ExecStoreVirtualTuple(myslot);
 
                /*
@@ -871,8 +875,15 @@ CopyFrom(CopyFromState cstate)
                        /* Skip items that don't match COPY's WHERE clause */
                        if (!ExecQual(cstate->qualexpr, econtext))
                        {
-                               /* Report that this tuple was filtered out by 
the WHERE clause */
-                               
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, ++excluded);
+                               const int progress_params[] = {
+                                       PROGRESS_COPY_TUPLES_PROCESSED,
+                                       PROGRESS_COPY_TUPLES_EXCLUDED
+                               };
+                               const int64 progress_vals[] = {
+                                       processed,
+                                       ++excluded
+                               };
+                               pgstat_progress_update_multi_param(2, 
progress_params, progress_vals);
                                continue;
                        }
                }
@@ -1001,8 +1012,18 @@ CopyFrom(CopyFromState cstate)
                                skip_tuple = true;      /* "do nothing" */
                }
 
-               if (!skip_tuple)
+               if (skip_tuple)
+                       
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, processed);
+               else
                {
+                       const int progress_params[] = {
+                               PROGRESS_COPY_TUPLES_PROCESSED,
+                               PROGRESS_COPY_TUPLES_INSERTED
+                       };
+                       int64 progress_vals[] = {
+                               processed,
+                               0
+                       };
                        /*
                         * If there is an INSTEAD OF INSERT ROW trigger, let it 
handle the
                         * tuple.  Otherwise, proceed with inserting the tuple 
into the
@@ -1073,7 +1094,11 @@ CopyFrom(CopyFromState cstate)
                                                                                
                                                                                
 NULL);
 
                                                if (myslot == NULL) /* "do 
nothing" */
+                                               {
+                                                       /* `processed` was 
updated at the top of this iteration; report the progress */
+                                                       
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, processed); 
                                                        continue;       /* next 
tuple please */
+                                               }
 
                                                /*
                                                 * AFTER ROW Triggers might 
reference the tableoid
@@ -1112,7 +1137,8 @@ CopyFrom(CopyFromState cstate)
                         * for counting tuples inserted by an INSERT command. 
Update
                         * progress of the COPY command as well.
                         */
-                       
pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++processed);
+                       progress_vals[1] = ++inserted;
+                       pgstat_progress_update_multi_param(2, progress_params, 
progress_vals);
                }
        }
 
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 7884a11383..6fa78a5405 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -134,12 +134,13 @@
 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL                 5
 
 /* Progress parameters for PROGRESS_COPY */
-#define PROGRESS_COPY_BYTES_PROCESSED 0
-#define PROGRESS_COPY_BYTES_TOTAL 1
-#define PROGRESS_COPY_TUPLES_PROCESSED 2
-#define PROGRESS_COPY_TUPLES_EXCLUDED 3
-#define PROGRESS_COPY_COMMAND 4
-#define PROGRESS_COPY_TYPE 5
+#define PROGRESS_COPY_COMMAND 0
+#define PROGRESS_COPY_TYPE 1
+#define PROGRESS_COPY_BYTES_PROCESSED 2
+#define PROGRESS_COPY_BYTES_TOTAL 3
+#define PROGRESS_COPY_TUPLES_PROCESSED 4
+#define PROGRESS_COPY_TUPLES_INSERTED 5
+#define PROGRESS_COPY_TUPLES_EXCLUDED 6
 
 /* Commands of PROGRESS_COPY_COMMAND */
 #define PROGRESS_COPY_COMMAND_FROM 1
diff --git a/src/test/regress/expected/rules.out 
b/src/test/regress/expected/rules.out
index 02f5e7c905..8558fec72a 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1950,22 +1950,23 @@ pg_stat_progress_copy| SELECT s.pid,
     s.datid,
     d.datname,
     s.relid,
-        CASE s.param5
+        CASE s.param1
             WHEN 1 THEN 'COPY FROM'::text
             WHEN 2 THEN 'COPY TO'::text
             ELSE NULL::text
         END AS command,
-        CASE s.param6
+        CASE s.param2
             WHEN 1 THEN 'FILE'::text
             WHEN 2 THEN 'PROGRAM'::text
             WHEN 3 THEN 'PIPE'::text
             WHEN 4 THEN 'CALLBACK'::text
             ELSE NULL::text
         END AS type,
-    s.param1 AS bytes_processed,
-    s.param2 AS bytes_total,
-    s.param3 AS tuples_processed,
-    s.param4 AS tuples_excluded
+    s.param3 AS bytes_processed,
+    s.param4 AS bytes_total,
+    s.param5 AS tuples_processed,
+    s.param6 AS tuples_inserted,
+    s.param7 AS tuples_excluded
    FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, 
param2, param3, param4, param5, param6, param7, param8, param9, param10, 
param11, param12, param13, param14, param15, param16, param17, param18, 
param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,
diff --git a/src/test/regress/output/copy.source 
b/src/test/regress/output/copy.source
index 6819c1334c..eb3f38ea9e 100644
--- a/src/test/regress/output/copy.source
+++ b/src/test/regress/output/copy.source
@@ -203,11 +203,11 @@ create trigger check_after_progress_reporting
        execute function notice_after_progress_reporting();
 -- reporting of STDIO imports, and correct bytes-processed/tuples-processed 
reporting
 copy progress_reporting from stdin;
-INFO:  progress: {"type": "PIPE", "command": "COPY FROM", "datname": 
"regression", "bytes_total": 0, "bytes_processed": 79, "tuples_excluded": 0, 
"tuples_processed": 3}
+INFO:  progress: {"type": "PIPE", "command": "COPY FROM", "datname": 
"regression", "bytes_total": 0, "bytes_processed": 79, "tuples_excluded": 0, 
"tuples_inserted": 3, "tuples_processed": 3}
 -- reporting of FILE imports, and correct reporting of tuples-excluded
 copy progress_reporting from '@abs_srcdir@/data/emp.data'
        where (salary < 2000);
-INFO:  progress: {"type": "FILE", "command": "COPY FROM", "datname": 
"regression", "bytes_total": 79, "bytes_processed": 79, "tuples_excluded": 1, 
"tuples_processed": 2}
+INFO:  progress: {"type": "FILE", "command": "COPY FROM", "datname": 
"regression", "bytes_total": 79, "bytes_processed": 79, "tuples_excluded": 1, 
"tuples_inserted": 2, "tuples_processed": 3}
 -- cleanup progress_reporting
 drop trigger check_after_progress_reporting on progress_reporting;
 drop function notice_after_progress_reporting();
-- 
2.20.1

From 9add598d220ca7fd14c86d90cb93daf890436896 Mon Sep 17 00:00:00 2001
From: Matthias van de Meent <boekew...@gmail.com>
Date: Fri, 12 Feb 2021 14:06:44 +0100
Subject: [PATCH v12 1/3] Additional progress-reported components for COPY
 progress reporting

The command, io target and excluded tuple count (by the
COPY ... FROM ... WHERE -clause) are now reported in the pg_stat_progress_copy
view. Additionally, the column lines_processed is renamed to tuples_processed
to disambiguate the meaning of this column in cases of CSV and BINARY copies
and to stay consistent with regards to names in the pg_stat_progress_*-views.

Of special interest is the reporting of the copy type, with which we can
distinguish logical replications' initial table synchronization workers'
progress without having to join the pg_stat_activity view.
---
 doc/src/sgml/monitoring.sgml         | 42 +++++++++++++++++++++++++---
 src/backend/catalog/system_views.sql | 11 +++++++-
 src/backend/commands/copyfrom.c      | 27 ++++++++++++++++--
 src/backend/commands/copyto.c        | 22 ++++++++++++---
 src/include/commands/progress.h      | 17 +++++++++--
 src/test/regress/expected/rules.out  | 15 +++++++++-
 6 files changed, 119 insertions(+), 15 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 3513e127b7..32bebc70db 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -6541,8 +6541,32 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
        <structfield>relid</structfield> <type>oid</type>
       </para>
       <para>
-       OID of the table on which the <command>COPY</command> command is executed.
-       It is set to 0 if copying from a <command>SELECT</command> query.
+       OID of the table on which the <command>COPY</command> command is being
+       executed. It is set to 0 if copying from a <command>SELECT</command>
+       query.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>command</structfield> <type>text</type>
+      </para>
+      <para>
+       The command that is running: <literal>COPY FROM</literal>, or
+       <literal>COPY TO</literal>.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>type</structfield> <type>text</type>
+      </para>
+      <para>
+       The io type that the data is read from or written to:
+       <literal>FILE</literal>, <literal>PROGRAM</literal>,
+       <literal>PIPE</literal> (for <command>COPY FROM STDIN</command> and
+       <command>COPY TO STDOUT</command>), or <literal>CALLBACK</literal>
+       (used in the table synchronization background worker).
       </para></entry>
      </row>
 
@@ -6567,10 +6591,20 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid,
 
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
-       <structfield>lines_processed</structfield> <type>bigint</type>
+       <structfield>tuples_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of tuples already processed by <command>COPY</command> command.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>tuples_excluded</structfield> <type>bigint</type>
       </para>
       <para>
-       Number of lines already processed by <command>COPY</command> command.
+       Number of tuples not processed because they were excluded by the
+       <command>WHERE</command> clause of the <command>COPY</command> command.
       </para></entry>
      </row>
     </tbody>
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fc94a73a54..8f9987f311 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1130,9 +1130,18 @@ CREATE VIEW pg_stat_progress_copy AS
     SELECT
         S.pid AS pid, S.datid AS datid, D.datname AS datname,
         S.relid AS relid,
+        CASE S.param5 WHEN 1 THEN 'COPY FROM'
+                      WHEN 2 THEN 'COPY TO'
+                      END AS command,
+        CASE S.param6 WHEN 1 THEN 'FILE'
+                      WHEN 2 THEN 'PROGRAM'
+                      WHEN 3 THEN 'PIPE'
+                      WHEN 4 THEN 'CALLBACK'
+                      END AS "type",
         S.param1 AS bytes_processed,
         S.param2 AS bytes_total,
-        S.param3 AS lines_processed
+        S.param3 AS tuples_processed,
+        S.param4 AS tuples_excluded
     FROM pg_stat_get_progress_info('COPY') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c
index f05e2d2347..8856bf215b 100644
--- a/src/backend/commands/copyfrom.c
+++ b/src/backend/commands/copyfrom.c
@@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate)
 	BulkInsertState bistate = NULL;
 	CopyInsertMethod insertMethod;
 	CopyMultiInsertInfo multiInsertInfo = {0};	/* pacify compiler */
-	uint64		processed = 0;
+	int64		processed = 0;
+	int64		excluded = 0;
 	bool		has_before_insert_row_trig;
 	bool		has_instead_insert_row_trig;
 	bool		leafpart_use_multi_insert = false;
@@ -869,7 +870,11 @@ CopyFrom(CopyFromState cstate)
 			econtext->ecxt_scantuple = myslot;
 			/* Skip items that don't match COPY's WHERE clause */
 			if (!ExecQual(cstate->qualexpr, econtext))
+			{
+				/* Report that this tuple was filtered out by the WHERE clause */
+				pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, ++excluded);
 				continue;
+			}
 		}
 
 		/* Determine the partition to insert the tuple into */
@@ -1107,7 +1112,7 @@ CopyFrom(CopyFromState cstate)
 			 * for counting tuples inserted by an INSERT command. Update
 			 * progress of the COPY command as well.
 			 */
-			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
+			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++processed);
 		}
 	}
 
@@ -1193,6 +1198,16 @@ BeginCopyFrom(ParseState *pstate,
 	ExprState **defexprs;
 	MemoryContext oldcontext;
 	bool		volatile_defexprs;
+	const int	progress_cols[] = {
+		PROGRESS_COPY_COMMAND,
+		PROGRESS_COPY_TYPE,
+		PROGRESS_COPY_BYTES_TOTAL
+	};
+	int64		progress_vals[] = {
+		PROGRESS_COPY_COMMAND_FROM,
+		0,
+		0
+	};
 
 	/* Allocate workspace and zero all fields */
 	cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData));
@@ -1430,11 +1445,13 @@ BeginCopyFrom(ParseState *pstate,
 
 	if (data_source_cb)
 	{
+		progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK;
 		cstate->copy_src = COPY_CALLBACK;
 		cstate->data_source_cb = data_source_cb;
 	}
 	else if (pipe)
 	{
+		progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
 		Assert(!is_program);	/* the grammar does not allow this */
 		if (whereToSendOutput == DestRemote)
 			ReceiveCopyBegin(cstate);
@@ -1447,6 +1464,7 @@ BeginCopyFrom(ParseState *pstate,
 
 		if (cstate->is_program)
 		{
+			progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R);
 			if (cstate->copy_file == NULL)
 				ereport(ERROR,
@@ -1458,6 +1476,7 @@ BeginCopyFrom(ParseState *pstate,
 		{
 			struct stat st;
 
+			progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
 			cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R);
 			if (cstate->copy_file == NULL)
 			{
@@ -1484,10 +1503,12 @@ BeginCopyFrom(ParseState *pstate,
 						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
 						 errmsg("\"%s\" is a directory", cstate->filename)));
 
-			pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size);
+			progress_vals[2] = st.st_size;
 		}
 	}
 
+	pgstat_progress_update_multi_param(3, progress_cols, progress_vals);
+
 	if (cstate->opts.binary)
 	{
 		/* Read and verify binary header */
diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c
index 46155015cf..3b5a20a065 100644
--- a/src/backend/commands/copyto.c
+++ b/src/backend/commands/copyto.c
@@ -353,6 +353,14 @@ BeginCopyTo(ParseState *pstate,
 	TupleDesc	tupDesc;
 	int			num_phys_attrs;
 	MemoryContext oldcontext;
+	const int	progress_cols[] = {
+		PROGRESS_COPY_COMMAND,
+		PROGRESS_COPY_TYPE
+	};
+	int64		progress_vals[] = {
+		PROGRESS_COPY_COMMAND_TO,
+		0
+	};
 
 	if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION)
 	{
@@ -659,6 +667,8 @@ BeginCopyTo(ParseState *pstate,
 
 	if (pipe)
 	{
+		progress_vals[1] = PROGRESS_COPY_TYPE_PIPE;
+
 		Assert(!is_program);	/* the grammar does not allow this */
 		if (whereToSendOutput != DestRemote)
 			cstate->copy_file = stdout;
@@ -670,6 +680,7 @@ BeginCopyTo(ParseState *pstate,
 
 		if (is_program)
 		{
+			progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM;
 			cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W);
 			if (cstate->copy_file == NULL)
 				ereport(ERROR,
@@ -682,6 +693,7 @@ BeginCopyTo(ParseState *pstate,
 			mode_t		oumask; /* Pre-existing umask value */
 			struct stat st;
 
+			progress_vals[1] = PROGRESS_COPY_TYPE_FILE;
 			/*
 			 * Prevent write to relative path ... too easy to shoot oneself in
 			 * the foot by overwriting a database file ...
@@ -731,6 +743,8 @@ BeginCopyTo(ParseState *pstate,
 	/* initialize progress */
 	pgstat_progress_start_command(PROGRESS_COMMAND_COPY,
 								  cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid);
+	pgstat_progress_update_multi_param(2, progress_cols, progress_vals);
+
 	cstate->bytes_processed = 0;
 
 	MemoryContextSwitchTo(oldcontext);
@@ -881,8 +895,8 @@ DoCopyTo(CopyToState cstate)
 			/* Format and send the data */
 			CopyOneRowTo(cstate, slot);
 
-			/* Increment amount of processed tuples and update the progress */
-			pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed);
+			/* Increment the number of processed tuples, and report the progress */
+			pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++processed);
 		}
 
 		ExecDropSingleTupleTableSlot(slot);
@@ -1251,8 +1265,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
 	/* Send the data */
 	CopyOneRowTo(cstate, slot);
 
-	/* Increment amount of processed tuples and update the progress */
-	pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed);
+	/* Increment the number of processed tuples, and report the progress */
+	pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++myState->processed);
 
 	return true;
 }
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index 95ec5d02e9..7884a11383 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -133,9 +133,22 @@
 #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE		4
 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL			5
 
-/* Commands of PROGRESS_COPY */
+/* Progress parameters for PROGRESS_COPY */
 #define PROGRESS_COPY_BYTES_PROCESSED 0
 #define PROGRESS_COPY_BYTES_TOTAL 1
-#define PROGRESS_COPY_LINES_PROCESSED 2
+#define PROGRESS_COPY_TUPLES_PROCESSED 2
+#define PROGRESS_COPY_TUPLES_EXCLUDED 3
+#define PROGRESS_COPY_COMMAND 4
+#define PROGRESS_COPY_TYPE 5
+
+/* Commands of PROGRESS_COPY_COMMAND */
+#define PROGRESS_COPY_COMMAND_FROM 1
+#define PROGRESS_COPY_COMMAND_TO 2
+
+/* Types of PROGRESS_COPY_TYPE */
+#define PROGRESS_COPY_TYPE_FILE 1
+#define PROGRESS_COPY_TYPE_PROGRAM 2
+#define PROGRESS_COPY_TYPE_PIPE 3
+#define PROGRESS_COPY_TYPE_CALLBACK 4
 
 #endif
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index b1c9b7bdfe..02f5e7c905 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1950,9 +1950,22 @@ pg_stat_progress_copy| SELECT s.pid,
     s.datid,
     d.datname,
     s.relid,
+        CASE s.param5
+            WHEN 1 THEN 'COPY FROM'::text
+            WHEN 2 THEN 'COPY TO'::text
+            ELSE NULL::text
+        END AS command,
+        CASE s.param6
+            WHEN 1 THEN 'FILE'::text
+            WHEN 2 THEN 'PROGRAM'::text
+            WHEN 3 THEN 'PIPE'::text
+            WHEN 4 THEN 'CALLBACK'::text
+            ELSE NULL::text
+        END AS type,
     s.param1 AS bytes_processed,
     s.param2 AS bytes_total,
-    s.param3 AS lines_processed
+    s.param3 AS tuples_processed,
+    s.param4 AS tuples_excluded
    FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_progress_create_index| SELECT s.pid,
-- 
2.20.1

From c26e13408c97dfd469f753a7341730f543ca0084 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= <josef.sima...@gmail.com>
Date: Tue, 9 Feb 2021 13:06:45 +0100
Subject: [PATCH v12 2/3] Add copy progress reporting regression tests.
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

This tests some basic features of copy progress reporting.

Sadly, we can only request one snapshot of progress_reporting each
transaction / statement, so we can't (easily) get intermediate results without
each result requiring a seperate statement being run.

Author: Josef Šimánek, Matthias van de Meent
---
 src/test/regress/input/copy.source  | 60 +++++++++++++++++++++++++++++
 src/test/regress/output/copy.source | 47 ++++++++++++++++++++++
 2 files changed, 107 insertions(+)

diff --git a/src/test/regress/input/copy.source b/src/test/regress/input/copy.source
index a1d529ad36..4f1cbc73d2 100644
--- a/src/test/regress/input/copy.source
+++ b/src/test/regress/input/copy.source
@@ -201,3 +201,63 @@ select * from parted_copytest where b = 1;
 select * from parted_copytest where b = 2;
 
 drop table parted_copytest;
+
+--
+-- progress reporting
+--
+
+-- setup
+-- reuse employer datatype, that has a small sized data set
+
+create table progress_reporting (
+	name 		text,
+	age			int4,
+	location 	point,
+	salary 		int4,
+	manager 	name
+);
+
+-- Add a trigger to 'raise info' the contents of pg_stat_progress_copy. This
+-- allows us to test and verify the contents of this view, which would
+-- otherwise require a concurrent session checking that table.
+create function notice_after_progress_reporting() returns trigger AS
+$$
+declare report record;
+begin
+	-- We cannot expect 'pid' nor 'relid' to be consistent over runs due to
+	-- variance in system process ids, and concurrency in runs of tests.
+	-- Additionally, due to the usage of this test in pg_regress, the 'datid'
+	-- also is not consistent between runs.
+	select into report (to_jsonb(r) - '{pid,relid,datid}'::text[]) as value
+		from pg_stat_progress_copy r
+		where pid = pg_backend_pid();
+
+	raise info 'progress: %', report.value::text;
+
+	RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+
+create trigger check_after_progress_reporting
+	after insert on progress_reporting
+	for each statement
+	execute function notice_after_progress_reporting();
+
+-- reporting of STDIO imports, and correct bytes-processed/tuples-processed reporting
+
+copy progress_reporting from stdin;
+sharon	25	(15,12)	1000	sam
+sam	30	(10,5)	2000	bill
+bill	20	(11,10)	1000	sharon
+\.
+
+-- reporting of FILE imports, and correct reporting of tuples-excluded
+
+copy progress_reporting from '@abs_srcdir@/data/emp.data'
+	where (salary < 2000);
+
+-- cleanup progress_reporting
+
+drop trigger check_after_progress_reporting on progress_reporting;
+drop function notice_after_progress_reporting();
+drop table progress_reporting;
diff --git a/src/test/regress/output/copy.source b/src/test/regress/output/copy.source
index 938d3551da..6819c1334c 100644
--- a/src/test/regress/output/copy.source
+++ b/src/test/regress/output/copy.source
@@ -165,3 +165,50 @@ select * from parted_copytest where b = 2;
 (1 row)
 
 drop table parted_copytest;
+--
+-- progress reporting
+--
+-- setup
+-- reuse employer datatype, that has a small sized data set
+create table progress_reporting (
+	name 		text,
+	age			int4,
+	location 	point,
+	salary 		int4,
+	manager 	name
+);
+-- Add a trigger to 'raise info' the contents of pg_stat_progress_copy. This
+-- allows us to test and verify the contents of this view, which would
+-- otherwise require a concurrent session checking that table.
+create function notice_after_progress_reporting() returns trigger AS
+$$
+declare report record;
+begin
+	-- We cannot expect 'pid' nor 'relid' to be consistent over runs due to
+	-- variance in system process ids, and concurrency in runs of tests.
+	-- Additionally, due to the usage of this test in pg_regress, the 'datid'
+	-- also is not consistent between runs.
+	select into report (to_jsonb(r) - '{pid,relid,datid}'::text[]) as value
+		from pg_stat_progress_copy r
+		where pid = pg_backend_pid();
+
+	raise info 'progress: %', report.value::text;
+
+	RETURN NEW;
+END;
+$$ LANGUAGE plpgsql;
+create trigger check_after_progress_reporting
+	after insert on progress_reporting
+	for each statement
+	execute function notice_after_progress_reporting();
+-- reporting of STDIO imports, and correct bytes-processed/tuples-processed reporting
+copy progress_reporting from stdin;
+INFO:  progress: {"type": "PIPE", "command": "COPY FROM", "datname": "regression", "bytes_total": 0, "bytes_processed": 79, "tuples_excluded": 0, "tuples_processed": 3}
+-- reporting of FILE imports, and correct reporting of tuples-excluded
+copy progress_reporting from '@abs_srcdir@/data/emp.data'
+	where (salary < 2000);
+INFO:  progress: {"type": "FILE", "command": "COPY FROM", "datname": "regression", "bytes_total": 79, "bytes_processed": 79, "tuples_excluded": 1, "tuples_processed": 2}
+-- cleanup progress_reporting
+drop trigger check_after_progress_reporting on progress_reporting;
+drop function notice_after_progress_reporting();
+drop table progress_reporting;
-- 
2.20.1

Reply via email to