On Mon, 8 Mar 2021 at 09:24, Michael Paquier <mich...@paquier.xyz> wrote: > > On Sun, Mar 07, 2021 at 04:50:31PM +0530, Bharath Rupireddy wrote: > > Attaching remaining patches 0001 and 0003 from the v11 patch > > set(posted upthread) here to make cfbot happier. > > Looking at patch 0002, the location of each progress report looks good > to me. I have some issues with some of the names chosen though, so I > would like to suggest a few changes to simplify things: > - PROGRESS_COPY_IO_TYPE_* => PROGRESS_COPY_TYPE_* > - PROGRESS_COPY_IO_TYPE => PROGRESS_COPY_TYPE > - PROGRESS_COPY_TYPE_STDIO => PROGRESS_COPY_TYPE_PIPE > - In pg_stat_progress_copy, io_type => type
Seems reasonable. PFA updated patches. I've renamed the previous 0003 to 0002 to keep git-format-patch easy. > It seems a bit confusing to not count insertions on foreign tables > where nothing happened. I am fine to live with that, but can I ask if > this has been thought about? This is keeping current behaviour of the implementation as committed with 8a4f618e, with the rationale of that patch being that this number should mirror the number returned by the copy command. I am not opposed to adding another column for `tuples_inserted` and changing the logic accordingly (see prototype 0003), but that was not in the intended scope of this patchset. Unless you think that this should be included in this current patchset, I'll spin that patch out into a different thread, but I'm not sure that would make it into pg14. With regards, Matthias van de Meent.
From 94876abe0ab9c28a6f4b0ac006f356251ca4746c Mon Sep 17 00:00:00 2001 From: Matthias van de Meent <boekew...@gmail.com> Date: Mon, 8 Mar 2021 16:08:24 +0100 Subject: [PATCH v12 3/3] Adapt COPY progress reporting to report processed and inserted tuples Previously, tuples_processed was implied to be the amount of tuples inserted in COPY ... FROM. This code is now changed to make tuples_processed count all tuples that were identified in the COPY ... FROM command, and make tuples_inserted count all tuples that were inserted into the table (that is, it excludes the tuples excluded using the WHERE clause, and those that were not inserted due to triggers, or failure to insert into foreign tables). This also renumbers the columns to be back in-order, before the stamping of pg14 makes those numbers effectively immutable --- doc/src/sgml/monitoring.sgml | 30 ++++++++++++++++++++++++ src/backend/catalog/system_views.sql | 13 ++++++----- src/backend/commands/copyfrom.c | 34 ++++++++++++++++++++++++---- src/include/commands/progress.h | 13 ++++++----- src/test/regress/expected/rules.out | 13 ++++++----- src/test/regress/output/copy.source | 4 ++-- 6 files changed, 83 insertions(+), 24 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 32bebc70db..aa2e15a748 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -6595,6 +6595,24 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, </para> <para> Number of tuples already processed by <command>COPY</command> command. + For <command>COPY FROM</command> this includes all tuples from the + source material (including tuples excluded by the + <command>WHERE</command> clause of the <command>COPY</command> + command, and tuples that were not inserted as a result of trigger + behaviour). For <command>COPY TO</command> this includes all tuples + that exist in the table, or those that are returned by the + <command>SELECT</command>. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>tuples_inserted</structfield> <type>bigint</type> + </para> + <para> + Number of tuples inserted into the table with the + <command>COPY FROM</command> command. Is <literal>0</literal> for + <command>COPY TO</command>. </para></entry> </row> @@ -6610,6 +6628,18 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, </tbody> </tgroup> </table> + <note> + <para> + The <literal>tuples_excluded</literal> column does not count the tuples + that failed to insert, but only those tuples that were available in the + source material and not eligible for insertion through exclusion by the + <command>WHERE</command> clause. You can calculate the number of tuples + that failed to insert into the table due to triggers (or that otherwise + silently fail to insert) by subtracting both + <literal>tuples_excluded</literal> and <literal>tuples_inserted</literal> + from <literal>tuples_processed</literal> instead. + </para> + </note> </sect2> </sect1> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 8f9987f311..f59de36742 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1130,18 +1130,19 @@ CREATE VIEW pg_stat_progress_copy AS SELECT S.pid AS pid, S.datid AS datid, D.datname AS datname, S.relid AS relid, - CASE S.param5 WHEN 1 THEN 'COPY FROM' + CASE S.param1 WHEN 1 THEN 'COPY FROM' WHEN 2 THEN 'COPY TO' END AS command, - CASE S.param6 WHEN 1 THEN 'FILE' + CASE S.param2 WHEN 1 THEN 'FILE' WHEN 2 THEN 'PROGRAM' WHEN 3 THEN 'PIPE' WHEN 4 THEN 'CALLBACK' END AS "type", - S.param1 AS bytes_processed, - S.param2 AS bytes_total, - S.param3 AS tuples_processed, - S.param4 AS tuples_excluded + S.param3 AS bytes_processed, + S.param4 AS bytes_total, + S.param5 AS tuples_processed, + S.param6 AS tuples_inserted, + S.param7 AS tuples_excluded FROM pg_stat_get_progress_info('COPY') AS S LEFT JOIN pg_database D ON S.datid = D.oid; diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 8856bf215b..0aae5ae71b 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -540,6 +540,7 @@ CopyFrom(CopyFromState cstate) CopyInsertMethod insertMethod; CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ int64 processed = 0; + int64 inserted = 0; int64 excluded = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; @@ -854,6 +855,9 @@ CopyFrom(CopyFromState cstate) if (!NextCopyFrom(cstate, econtext, myslot->tts_values, myslot->tts_isnull)) break; + /* We're starting processing the next tuple, so update processed */ + ++processed; + ExecStoreVirtualTuple(myslot); /* @@ -871,8 +875,15 @@ CopyFrom(CopyFromState cstate) /* Skip items that don't match COPY's WHERE clause */ if (!ExecQual(cstate->qualexpr, econtext)) { - /* Report that this tuple was filtered out by the WHERE clause */ - pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, ++excluded); + const int progress_params[] = { + PROGRESS_COPY_TUPLES_PROCESSED, + PROGRESS_COPY_TUPLES_EXCLUDED + }; + const int64 progress_vals[] = { + processed, + ++excluded + }; + pgstat_progress_update_multi_param(2, progress_params, progress_vals); continue; } } @@ -1001,8 +1012,18 @@ CopyFrom(CopyFromState cstate) skip_tuple = true; /* "do nothing" */ } - if (!skip_tuple) + if (skip_tuple) + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, processed); + else { + const int progress_params[] = { + PROGRESS_COPY_TUPLES_PROCESSED, + PROGRESS_COPY_TUPLES_INSERTED + }; + int64 progress_vals[] = { + processed, + 0 + }; /* * If there is an INSTEAD OF INSERT ROW trigger, let it handle the * tuple. Otherwise, proceed with inserting the tuple into the @@ -1073,7 +1094,11 @@ CopyFrom(CopyFromState cstate) NULL); if (myslot == NULL) /* "do nothing" */ + { + /* `processed` was updated at the top of this iteration; report the progress */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, processed); continue; /* next tuple please */ + } /* * AFTER ROW Triggers might reference the tableoid @@ -1112,7 +1137,8 @@ CopyFrom(CopyFromState cstate) * for counting tuples inserted by an INSERT command. Update * progress of the COPY command as well. */ - pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++processed); + progress_vals[1] = ++inserted; + pgstat_progress_update_multi_param(2, progress_params, progress_vals); } } diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 7884a11383..6fa78a5405 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -134,12 +134,13 @@ #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5 /* Progress parameters for PROGRESS_COPY */ -#define PROGRESS_COPY_BYTES_PROCESSED 0 -#define PROGRESS_COPY_BYTES_TOTAL 1 -#define PROGRESS_COPY_TUPLES_PROCESSED 2 -#define PROGRESS_COPY_TUPLES_EXCLUDED 3 -#define PROGRESS_COPY_COMMAND 4 -#define PROGRESS_COPY_TYPE 5 +#define PROGRESS_COPY_COMMAND 0 +#define PROGRESS_COPY_TYPE 1 +#define PROGRESS_COPY_BYTES_PROCESSED 2 +#define PROGRESS_COPY_BYTES_TOTAL 3 +#define PROGRESS_COPY_TUPLES_PROCESSED 4 +#define PROGRESS_COPY_TUPLES_INSERTED 5 +#define PROGRESS_COPY_TUPLES_EXCLUDED 6 /* Commands of PROGRESS_COPY_COMMAND */ #define PROGRESS_COPY_COMMAND_FROM 1 diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index 02f5e7c905..8558fec72a 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1950,22 +1950,23 @@ pg_stat_progress_copy| SELECT s.pid, s.datid, d.datname, s.relid, - CASE s.param5 + CASE s.param1 WHEN 1 THEN 'COPY FROM'::text WHEN 2 THEN 'COPY TO'::text ELSE NULL::text END AS command, - CASE s.param6 + CASE s.param2 WHEN 1 THEN 'FILE'::text WHEN 2 THEN 'PROGRAM'::text WHEN 3 THEN 'PIPE'::text WHEN 4 THEN 'CALLBACK'::text ELSE NULL::text END AS type, - s.param1 AS bytes_processed, - s.param2 AS bytes_total, - s.param3 AS tuples_processed, - s.param4 AS tuples_excluded + s.param3 AS bytes_processed, + s.param4 AS bytes_total, + s.param5 AS tuples_processed, + s.param6 AS tuples_inserted, + s.param7 AS tuples_excluded FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_create_index| SELECT s.pid, diff --git a/src/test/regress/output/copy.source b/src/test/regress/output/copy.source index 6819c1334c..eb3f38ea9e 100644 --- a/src/test/regress/output/copy.source +++ b/src/test/regress/output/copy.source @@ -203,11 +203,11 @@ create trigger check_after_progress_reporting execute function notice_after_progress_reporting(); -- reporting of STDIO imports, and correct bytes-processed/tuples-processed reporting copy progress_reporting from stdin; -INFO: progress: {"type": "PIPE", "command": "COPY FROM", "datname": "regression", "bytes_total": 0, "bytes_processed": 79, "tuples_excluded": 0, "tuples_processed": 3} +INFO: progress: {"type": "PIPE", "command": "COPY FROM", "datname": "regression", "bytes_total": 0, "bytes_processed": 79, "tuples_excluded": 0, "tuples_inserted": 3, "tuples_processed": 3} -- reporting of FILE imports, and correct reporting of tuples-excluded copy progress_reporting from '@abs_srcdir@/data/emp.data' where (salary < 2000); -INFO: progress: {"type": "FILE", "command": "COPY FROM", "datname": "regression", "bytes_total": 79, "bytes_processed": 79, "tuples_excluded": 1, "tuples_processed": 2} +INFO: progress: {"type": "FILE", "command": "COPY FROM", "datname": "regression", "bytes_total": 79, "bytes_processed": 79, "tuples_excluded": 1, "tuples_inserted": 2, "tuples_processed": 3} -- cleanup progress_reporting drop trigger check_after_progress_reporting on progress_reporting; drop function notice_after_progress_reporting(); -- 2.20.1
From 9add598d220ca7fd14c86d90cb93daf890436896 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent <boekew...@gmail.com> Date: Fri, 12 Feb 2021 14:06:44 +0100 Subject: [PATCH v12 1/3] Additional progress-reported components for COPY progress reporting The command, io target and excluded tuple count (by the COPY ... FROM ... WHERE -clause) are now reported in the pg_stat_progress_copy view. Additionally, the column lines_processed is renamed to tuples_processed to disambiguate the meaning of this column in cases of CSV and BINARY copies and to stay consistent with regards to names in the pg_stat_progress_*-views. Of special interest is the reporting of the copy type, with which we can distinguish logical replications' initial table synchronization workers' progress without having to join the pg_stat_activity view. --- doc/src/sgml/monitoring.sgml | 42 +++++++++++++++++++++++++--- src/backend/catalog/system_views.sql | 11 +++++++- src/backend/commands/copyfrom.c | 27 ++++++++++++++++-- src/backend/commands/copyto.c | 22 ++++++++++++--- src/include/commands/progress.h | 17 +++++++++-- src/test/regress/expected/rules.out | 15 +++++++++- 6 files changed, 119 insertions(+), 15 deletions(-) diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 3513e127b7..32bebc70db 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -6541,8 +6541,32 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, <structfield>relid</structfield> <type>oid</type> </para> <para> - OID of the table on which the <command>COPY</command> command is executed. - It is set to 0 if copying from a <command>SELECT</command> query. + OID of the table on which the <command>COPY</command> command is being + executed. It is set to 0 if copying from a <command>SELECT</command> + query. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>command</structfield> <type>text</type> + </para> + <para> + The command that is running: <literal>COPY FROM</literal>, or + <literal>COPY TO</literal>. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>type</structfield> <type>text</type> + </para> + <para> + The io type that the data is read from or written to: + <literal>FILE</literal>, <literal>PROGRAM</literal>, + <literal>PIPE</literal> (for <command>COPY FROM STDIN</command> and + <command>COPY TO STDOUT</command>), or <literal>CALLBACK</literal> + (used in the table synchronization background worker). </para></entry> </row> @@ -6567,10 +6591,20 @@ SELECT pg_stat_get_backend_pid(s.backendid) AS pid, <row> <entry role="catalog_table_entry"><para role="column_definition"> - <structfield>lines_processed</structfield> <type>bigint</type> + <structfield>tuples_processed</structfield> <type>bigint</type> + </para> + <para> + Number of tuples already processed by <command>COPY</command> command. + </para></entry> + </row> + + <row> + <entry role="catalog_table_entry"><para role="column_definition"> + <structfield>tuples_excluded</structfield> <type>bigint</type> </para> <para> - Number of lines already processed by <command>COPY</command> command. + Number of tuples not processed because they were excluded by the + <command>WHERE</command> clause of the <command>COPY</command> command. </para></entry> </row> </tbody> diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index fc94a73a54..8f9987f311 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1130,9 +1130,18 @@ CREATE VIEW pg_stat_progress_copy AS SELECT S.pid AS pid, S.datid AS datid, D.datname AS datname, S.relid AS relid, + CASE S.param5 WHEN 1 THEN 'COPY FROM' + WHEN 2 THEN 'COPY TO' + END AS command, + CASE S.param6 WHEN 1 THEN 'FILE' + WHEN 2 THEN 'PROGRAM' + WHEN 3 THEN 'PIPE' + WHEN 4 THEN 'CALLBACK' + END AS "type", S.param1 AS bytes_processed, S.param2 AS bytes_total, - S.param3 AS lines_processed + S.param3 AS tuples_processed, + S.param4 AS tuples_excluded FROM pg_stat_get_progress_info('COPY') AS S LEFT JOIN pg_database D ON S.datid = D.oid; diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index f05e2d2347..8856bf215b 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -539,7 +539,8 @@ CopyFrom(CopyFromState cstate) BulkInsertState bistate = NULL; CopyInsertMethod insertMethod; CopyMultiInsertInfo multiInsertInfo = {0}; /* pacify compiler */ - uint64 processed = 0; + int64 processed = 0; + int64 excluded = 0; bool has_before_insert_row_trig; bool has_instead_insert_row_trig; bool leafpart_use_multi_insert = false; @@ -869,7 +870,11 @@ CopyFrom(CopyFromState cstate) econtext->ecxt_scantuple = myslot; /* Skip items that don't match COPY's WHERE clause */ if (!ExecQual(cstate->qualexpr, econtext)) + { + /* Report that this tuple was filtered out by the WHERE clause */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_EXCLUDED, ++excluded); continue; + } } /* Determine the partition to insert the tuple into */ @@ -1107,7 +1112,7 @@ CopyFrom(CopyFromState cstate) * for counting tuples inserted by an INSERT command. Update * progress of the COPY command as well. */ - pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++processed); } } @@ -1193,6 +1198,16 @@ BeginCopyFrom(ParseState *pstate, ExprState **defexprs; MemoryContext oldcontext; bool volatile_defexprs; + const int progress_cols[] = { + PROGRESS_COPY_COMMAND, + PROGRESS_COPY_TYPE, + PROGRESS_COPY_BYTES_TOTAL + }; + int64 progress_vals[] = { + PROGRESS_COPY_COMMAND_FROM, + 0, + 0 + }; /* Allocate workspace and zero all fields */ cstate = (CopyFromStateData *) palloc0(sizeof(CopyFromStateData)); @@ -1430,11 +1445,13 @@ BeginCopyFrom(ParseState *pstate, if (data_source_cb) { + progress_vals[1] = PROGRESS_COPY_TYPE_CALLBACK; cstate->copy_src = COPY_CALLBACK; cstate->data_source_cb = data_source_cb; } else if (pipe) { + progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput == DestRemote) ReceiveCopyBegin(cstate); @@ -1447,6 +1464,7 @@ BeginCopyFrom(ParseState *pstate, if (cstate->is_program) { + progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_R); if (cstate->copy_file == NULL) ereport(ERROR, @@ -1458,6 +1476,7 @@ BeginCopyFrom(ParseState *pstate, { struct stat st; + progress_vals[1] = PROGRESS_COPY_TYPE_FILE; cstate->copy_file = AllocateFile(cstate->filename, PG_BINARY_R); if (cstate->copy_file == NULL) { @@ -1484,10 +1503,12 @@ BeginCopyFrom(ParseState *pstate, (errcode(ERRCODE_WRONG_OBJECT_TYPE), errmsg("\"%s\" is a directory", cstate->filename))); - pgstat_progress_update_param(PROGRESS_COPY_BYTES_TOTAL, st.st_size); + progress_vals[2] = st.st_size; } } + pgstat_progress_update_multi_param(3, progress_cols, progress_vals); + if (cstate->opts.binary) { /* Read and verify binary header */ diff --git a/src/backend/commands/copyto.c b/src/backend/commands/copyto.c index 46155015cf..3b5a20a065 100644 --- a/src/backend/commands/copyto.c +++ b/src/backend/commands/copyto.c @@ -353,6 +353,14 @@ BeginCopyTo(ParseState *pstate, TupleDesc tupDesc; int num_phys_attrs; MemoryContext oldcontext; + const int progress_cols[] = { + PROGRESS_COPY_COMMAND, + PROGRESS_COPY_TYPE + }; + int64 progress_vals[] = { + PROGRESS_COPY_COMMAND_TO, + 0 + }; if (rel != NULL && rel->rd_rel->relkind != RELKIND_RELATION) { @@ -659,6 +667,8 @@ BeginCopyTo(ParseState *pstate, if (pipe) { + progress_vals[1] = PROGRESS_COPY_TYPE_PIPE; + Assert(!is_program); /* the grammar does not allow this */ if (whereToSendOutput != DestRemote) cstate->copy_file = stdout; @@ -670,6 +680,7 @@ BeginCopyTo(ParseState *pstate, if (is_program) { + progress_vals[1] = PROGRESS_COPY_TYPE_PROGRAM; cstate->copy_file = OpenPipeStream(cstate->filename, PG_BINARY_W); if (cstate->copy_file == NULL) ereport(ERROR, @@ -682,6 +693,7 @@ BeginCopyTo(ParseState *pstate, mode_t oumask; /* Pre-existing umask value */ struct stat st; + progress_vals[1] = PROGRESS_COPY_TYPE_FILE; /* * Prevent write to relative path ... too easy to shoot oneself in * the foot by overwriting a database file ... @@ -731,6 +743,8 @@ BeginCopyTo(ParseState *pstate, /* initialize progress */ pgstat_progress_start_command(PROGRESS_COMMAND_COPY, cstate->rel ? RelationGetRelid(cstate->rel) : InvalidOid); + pgstat_progress_update_multi_param(2, progress_cols, progress_vals); + cstate->bytes_processed = 0; MemoryContextSwitchTo(oldcontext); @@ -881,8 +895,8 @@ DoCopyTo(CopyToState cstate) /* Format and send the data */ CopyOneRowTo(cstate, slot); - /* Increment amount of processed tuples and update the progress */ - pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++processed); + /* Increment the number of processed tuples, and report the progress */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++processed); } ExecDropSingleTupleTableSlot(slot); @@ -1251,8 +1265,8 @@ copy_dest_receive(TupleTableSlot *slot, DestReceiver *self) /* Send the data */ CopyOneRowTo(cstate, slot); - /* Increment amount of processed tuples and update the progress */ - pgstat_progress_update_param(PROGRESS_COPY_LINES_PROCESSED, ++myState->processed); + /* Increment the number of processed tuples, and report the progress */ + pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED, ++myState->processed); return true; } diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h index 95ec5d02e9..7884a11383 100644 --- a/src/include/commands/progress.h +++ b/src/include/commands/progress.h @@ -133,9 +133,22 @@ #define PROGRESS_BASEBACKUP_PHASE_WAIT_WAL_ARCHIVE 4 #define PROGRESS_BASEBACKUP_PHASE_TRANSFER_WAL 5 -/* Commands of PROGRESS_COPY */ +/* Progress parameters for PROGRESS_COPY */ #define PROGRESS_COPY_BYTES_PROCESSED 0 #define PROGRESS_COPY_BYTES_TOTAL 1 -#define PROGRESS_COPY_LINES_PROCESSED 2 +#define PROGRESS_COPY_TUPLES_PROCESSED 2 +#define PROGRESS_COPY_TUPLES_EXCLUDED 3 +#define PROGRESS_COPY_COMMAND 4 +#define PROGRESS_COPY_TYPE 5 + +/* Commands of PROGRESS_COPY_COMMAND */ +#define PROGRESS_COPY_COMMAND_FROM 1 +#define PROGRESS_COPY_COMMAND_TO 2 + +/* Types of PROGRESS_COPY_TYPE */ +#define PROGRESS_COPY_TYPE_FILE 1 +#define PROGRESS_COPY_TYPE_PROGRAM 2 +#define PROGRESS_COPY_TYPE_PIPE 3 +#define PROGRESS_COPY_TYPE_CALLBACK 4 #endif diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index b1c9b7bdfe..02f5e7c905 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1950,9 +1950,22 @@ pg_stat_progress_copy| SELECT s.pid, s.datid, d.datname, s.relid, + CASE s.param5 + WHEN 1 THEN 'COPY FROM'::text + WHEN 2 THEN 'COPY TO'::text + ELSE NULL::text + END AS command, + CASE s.param6 + WHEN 1 THEN 'FILE'::text + WHEN 2 THEN 'PROGRAM'::text + WHEN 3 THEN 'PIPE'::text + WHEN 4 THEN 'CALLBACK'::text + ELSE NULL::text + END AS type, s.param1 AS bytes_processed, s.param2 AS bytes_total, - s.param3 AS lines_processed + s.param3 AS tuples_processed, + s.param4 AS tuples_excluded FROM (pg_stat_get_progress_info('COPY'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20) LEFT JOIN pg_database d ON ((s.datid = d.oid))); pg_stat_progress_create_index| SELECT s.pid, -- 2.20.1
From c26e13408c97dfd469f753a7341730f543ca0084 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Josef=20=C5=A0im=C3=A1nek?= <josef.sima...@gmail.com> Date: Tue, 9 Feb 2021 13:06:45 +0100 Subject: [PATCH v12 2/3] Add copy progress reporting regression tests. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This tests some basic features of copy progress reporting. Sadly, we can only request one snapshot of progress_reporting each transaction / statement, so we can't (easily) get intermediate results without each result requiring a seperate statement being run. Author: Josef Šimánek, Matthias van de Meent --- src/test/regress/input/copy.source | 60 +++++++++++++++++++++++++++++ src/test/regress/output/copy.source | 47 ++++++++++++++++++++++ 2 files changed, 107 insertions(+) diff --git a/src/test/regress/input/copy.source b/src/test/regress/input/copy.source index a1d529ad36..4f1cbc73d2 100644 --- a/src/test/regress/input/copy.source +++ b/src/test/regress/input/copy.source @@ -201,3 +201,63 @@ select * from parted_copytest where b = 1; select * from parted_copytest where b = 2; drop table parted_copytest; + +-- +-- progress reporting +-- + +-- setup +-- reuse employer datatype, that has a small sized data set + +create table progress_reporting ( + name text, + age int4, + location point, + salary int4, + manager name +); + +-- Add a trigger to 'raise info' the contents of pg_stat_progress_copy. This +-- allows us to test and verify the contents of this view, which would +-- otherwise require a concurrent session checking that table. +create function notice_after_progress_reporting() returns trigger AS +$$ +declare report record; +begin + -- We cannot expect 'pid' nor 'relid' to be consistent over runs due to + -- variance in system process ids, and concurrency in runs of tests. + -- Additionally, due to the usage of this test in pg_regress, the 'datid' + -- also is not consistent between runs. + select into report (to_jsonb(r) - '{pid,relid,datid}'::text[]) as value + from pg_stat_progress_copy r + where pid = pg_backend_pid(); + + raise info 'progress: %', report.value::text; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +create trigger check_after_progress_reporting + after insert on progress_reporting + for each statement + execute function notice_after_progress_reporting(); + +-- reporting of STDIO imports, and correct bytes-processed/tuples-processed reporting + +copy progress_reporting from stdin; +sharon 25 (15,12) 1000 sam +sam 30 (10,5) 2000 bill +bill 20 (11,10) 1000 sharon +\. + +-- reporting of FILE imports, and correct reporting of tuples-excluded + +copy progress_reporting from '@abs_srcdir@/data/emp.data' + where (salary < 2000); + +-- cleanup progress_reporting + +drop trigger check_after_progress_reporting on progress_reporting; +drop function notice_after_progress_reporting(); +drop table progress_reporting; diff --git a/src/test/regress/output/copy.source b/src/test/regress/output/copy.source index 938d3551da..6819c1334c 100644 --- a/src/test/regress/output/copy.source +++ b/src/test/regress/output/copy.source @@ -165,3 +165,50 @@ select * from parted_copytest where b = 2; (1 row) drop table parted_copytest; +-- +-- progress reporting +-- +-- setup +-- reuse employer datatype, that has a small sized data set +create table progress_reporting ( + name text, + age int4, + location point, + salary int4, + manager name +); +-- Add a trigger to 'raise info' the contents of pg_stat_progress_copy. This +-- allows us to test and verify the contents of this view, which would +-- otherwise require a concurrent session checking that table. +create function notice_after_progress_reporting() returns trigger AS +$$ +declare report record; +begin + -- We cannot expect 'pid' nor 'relid' to be consistent over runs due to + -- variance in system process ids, and concurrency in runs of tests. + -- Additionally, due to the usage of this test in pg_regress, the 'datid' + -- also is not consistent between runs. + select into report (to_jsonb(r) - '{pid,relid,datid}'::text[]) as value + from pg_stat_progress_copy r + where pid = pg_backend_pid(); + + raise info 'progress: %', report.value::text; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; +create trigger check_after_progress_reporting + after insert on progress_reporting + for each statement + execute function notice_after_progress_reporting(); +-- reporting of STDIO imports, and correct bytes-processed/tuples-processed reporting +copy progress_reporting from stdin; +INFO: progress: {"type": "PIPE", "command": "COPY FROM", "datname": "regression", "bytes_total": 0, "bytes_processed": 79, "tuples_excluded": 0, "tuples_processed": 3} +-- reporting of FILE imports, and correct reporting of tuples-excluded +copy progress_reporting from '@abs_srcdir@/data/emp.data' + where (salary < 2000); +INFO: progress: {"type": "FILE", "command": "COPY FROM", "datname": "regression", "bytes_total": 79, "bytes_processed": 79, "tuples_excluded": 1, "tuples_processed": 2} +-- cleanup progress_reporting +drop trigger check_after_progress_reporting on progress_reporting; +drop function notice_after_progress_reporting(); +drop table progress_reporting; -- 2.20.1