On Wed, Apr 12, 2023 at 9:22 PM Imseih (AWS), Sami <sims...@amazon.com> wrote:
>
> > This should be OK (also checked the code paths where the reports are
> > added). Note that the patch needed a few adjustments for its
> > indentation.
>
> Thanks for the formatting corrections! This looks good to me.

Thank you for updating the patch. It looks good to me too. I've
updated the commit message.

Regards,

--
Masahiko Sawada
Amazon Web Services: https://aws.amazon.com
From f92929f21d4052cfbc3b068ca5b8be954741da3d Mon Sep 17 00:00:00 2001
From: Michael Paquier <michael@paquier.xyz>
Date: Wed, 12 Apr 2023 13:45:59 +0900
Subject: [PATCH v30] Report index vacuum progress.

This commit adds two columns: indexes_total and indexes_processed, to
pg_stat_progress_vacuum system view to show the index vacuum
progress. These numbers are reported in the "vacuuming indexes" and
"cleaning up indexes" phases.

It also adds a new type of parallel message, 'P'. Which is used to
convey the progress updates made by parallel workers to the leader
process. Therefore, the parallel workers' progress updates are
reflected in an asynchronous manner. Currently it supports only
incremental progress reporting but it's possible to allow for
supporting of other backend process APIs in the future.

Authors: Sami Imseih
Reviewed by: Masahiko Sawada, Michael Paquier, Nathan Bossart, Andres Freund
Discussion: https://www.postgresql.org/message-id/flat/5478DFCD-2333-401A-B2F0-0D186AB09228@amazon.com
---
 doc/src/sgml/monitoring.sgml                  | 23 ++++++
 src/backend/access/heap/vacuumlazy.c          | 70 ++++++++++++++++---
 src/backend/access/transam/parallel.c         | 18 +++++
 src/backend/catalog/system_views.sql          |  3 +-
 src/backend/commands/vacuumparallel.c         |  9 ++-
 src/backend/utils/activity/backend_progress.c | 32 +++++++++
 src/include/commands/progress.h               |  2 +
 src/include/utils/backend_progress.h          |  1 +
 src/test/regress/expected/rules.out           |  4 +-
 9 files changed, 150 insertions(+), 12 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index 506aeaa879..588b720f57 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -6110,6 +6110,29 @@ FROM pg_stat_get_backend_idset() AS backendid;
        Number of dead tuples collected since the last index vacuum cycle.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_total</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Total number of indexes that will be vacuumed or cleaned up. This
+       number is reported at the beginning of the
+       <literal>vacuuming indexes</literal> phase or the
+       <literal>cleaning up indexes</literal> phase.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of indexes processed. This counter only advances when the
+       phase is <literal>vacuuming indexes</literal> or
+       <literal>cleaning up indexes</literal>.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 4eb953f904..6a41ee635d 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2319,6 +2319,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 {
 	bool		allindexes = true;
 	double		old_live_tuples = vacrel->rel->rd_rel->reltuples;
+	const int	progress_start_index[] = {
+		PROGRESS_VACUUM_PHASE,
+		PROGRESS_VACUUM_INDEXES_TOTAL
+	};
+	const int	progress_end_index[] = {
+		PROGRESS_VACUUM_INDEXES_TOTAL,
+		PROGRESS_VACUUM_INDEXES_PROCESSED,
+		PROGRESS_VACUUM_NUM_INDEX_VACUUMS
+	};
+	int64		progress_start_val[2];
+	int64		progress_end_val[3];
 
 	Assert(vacrel->nindexes > 0);
 	Assert(vacrel->do_index_vacuuming);
@@ -2331,9 +2342,13 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 		return false;
 	}
 
-	/* Report that we are now vacuuming indexes */
-	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-								 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
+	/*
+	 * Report that we are now vacuuming indexes and the number of indexes to
+	 * vacuum.
+	 */
+	progress_start_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_INDEX;
+	progress_start_val[1] = vacrel->nindexes;
+	pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
@@ -2346,6 +2361,10 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 														  old_live_tuples,
 														  vacrel);
 
+			/* Report the number of indexes vacuumed */
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_PROCESSED,
+										 idx + 1);
+
 			if (lazy_check_wraparound_failsafe(vacrel))
 			{
 				/* Wraparound emergency -- end current index scan */
@@ -2380,14 +2399,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	Assert(allindexes || VacuumFailsafeActive);
 
 	/*
-	 * Increase and report the number of index scans.
+	 * Increase and report the number of index scans.  Also, we reset
+	 * PROGRESS_VACUUM_INDEXES_TOTAL and PROGRESS_VACUUM_INDEXES_PROCESSED.
 	 *
 	 * We deliberately include the case where we started a round of bulk
 	 * deletes that we weren't able to finish due to the failsafe triggering.
 	 */
 	vacrel->num_index_scans++;
-	pgstat_progress_update_param(PROGRESS_VACUUM_NUM_INDEX_VACUUMS,
-								 vacrel->num_index_scans);
+	progress_end_val[0] = 0;
+	progress_end_val[1] = 0;
+	progress_end_val[2] = vacrel->num_index_scans;
+	pgstat_progress_update_multi_param(3, progress_end_index, progress_end_val);
 
 	return allindexes;
 }
@@ -2624,6 +2646,12 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 
 	if (unlikely(vacuum_xid_failsafe_check(&vacrel->cutoffs)))
 	{
+		const int	progress_index[] = {
+			PROGRESS_VACUUM_INDEXES_TOTAL,
+			PROGRESS_VACUUM_INDEXES_PROCESSED
+		};
+		int64		progress_val[2] = {0, 0};
+
 		VacuumFailsafeActive = true;
 
 		/*
@@ -2638,6 +2666,9 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 		vacrel->do_index_cleanup = false;
 		vacrel->do_rel_truncate = false;
 
+		/* Reset the progress counters */
+		pgstat_progress_update_multi_param(2, progress_index, progress_val);
+
 		ereport(WARNING,
 				(errmsg("bypassing nonessential maintenance of table \"%s.%s.%s\" as a failsafe after %d index scans",
 						vacrel->dbname, vacrel->relnamespace, vacrel->relname,
@@ -2664,13 +2695,27 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 {
 	double		reltuples = vacrel->new_rel_tuples;
 	bool		estimated_count = vacrel->scanned_pages < vacrel->rel_pages;
+	const int	progress_start_index[] = {
+		PROGRESS_VACUUM_PHASE,
+		PROGRESS_VACUUM_INDEXES_TOTAL
+	};
+	const int	progress_end_index[] = {
+		PROGRESS_VACUUM_INDEXES_TOTAL,
+		PROGRESS_VACUUM_INDEXES_PROCESSED
+	};
+	int64		progress_start_val[2];
+	int64		progress_end_val[2] = {0, 0};
 
 	Assert(vacrel->do_index_cleanup);
 	Assert(vacrel->nindexes > 0);
 
-	/* Report that we are now cleaning up indexes */
-	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
+	/*
+	 * Report that we are now cleaning up indexes and the number of indexes to
+	 * cleanup.
+	 */
+	progress_start_val[0] = PROGRESS_VACUUM_PHASE_INDEX_CLEANUP;
+	progress_start_val[1] = vacrel->nindexes;
+	pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
@@ -2682,6 +2727,10 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 			vacrel->indstats[idx] =
 				lazy_cleanup_one_index(indrel, istat, reltuples,
 									   estimated_count, vacrel);
+
+			/* Report the number of indexes cleaned up */
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEXES_PROCESSED,
+										 idx + 1);
 		}
 	}
 	else
@@ -2691,6 +2740,9 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 											vacrel->num_index_scans,
 											estimated_count);
 	}
+
+	/* Reset the progress counters */
+	pgstat_progress_update_multi_param(2, progress_end_index, progress_end_val);
 }
 
 /*
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index 2b8bc2f58d..2bd04bd177 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -24,6 +24,7 @@
 #include "catalog/pg_enum.h"
 #include "catalog/storage.h"
 #include "commands/async.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "executor/execParallel.h"
 #include "libpq/libpq.h"
@@ -1199,6 +1200,23 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 				break;
 			}
 
+		case 'P':				/* Parallel progress reporting */
+			{
+				/*
+				 * Only incremental progress reporting is currently supported.
+				 * However, it's possible to add more fields to the message to
+				 * allow for handling of other backend progress APIs.
+				 */
+				int			index = pq_getmsgint(msg, 4);
+				int64		incr = pq_getmsgint64(msg);
+
+				pq_getmsgend(msg);
+
+				pgstat_progress_incr_param(index, incr);
+
+				break;
+			}
+
 		case 'X':				/* Terminate, indicating clean exit */
 			{
 				shm_mq_detach(pcxt->worker[i].error_mqh);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index c18fea8362..af65af6bdd 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1192,7 +1192,8 @@ CREATE VIEW pg_stat_progress_vacuum AS
                       END AS phase,
         S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned,
         S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count,
-        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples
+        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples,
+        S.param8 AS indexes_total, S.param9 AS indexes_processed
     FROM pg_stat_get_progress_info('VACUUM') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index a79067fd46..351ab4957a 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -30,6 +30,7 @@
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/index.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
 #include "optimizer/paths.h"
 #include "pgstat.h"
@@ -631,7 +632,7 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
 													vacuum));
 	}
 
-	/* Reset the parallel index processing counter */
+	/* Reset the parallel index processing and progress counters */
 	pg_atomic_write_u32(&(pvs->shared->idx), 0);
 
 	/* Setup the shared cost-based vacuum delay and launch workers */
@@ -902,6 +903,12 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
 	pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
 	pfree(pvs->indname);
 	pvs->indname = NULL;
+
+	/*
+	 * Call the parallel variant of pgstat_progress_incr_param so workers can
+	 * report progress of index vacuum to the leader.
+	 */
+	pgstat_progress_parallel_incr_param(PROGRESS_VACUUM_INDEXES_PROCESSED, 1);
 }
 
 /*
diff --git a/src/backend/utils/activity/backend_progress.c b/src/backend/utils/activity/backend_progress.c
index fb48eafef9..67447ef03a 100644
--- a/src/backend/utils/activity/backend_progress.c
+++ b/src/backend/utils/activity/backend_progress.c
@@ -10,6 +10,8 @@
  */
 #include "postgres.h"
 
+#include "access/parallel.h"
+#include "libpq/pqformat.h"
 #include "port/atomics.h"		/* for memory barriers */
 #include "utils/backend_progress.h"
 #include "utils/backend_status.h"
@@ -79,6 +81,36 @@ pgstat_progress_incr_param(int index, int64 incr)
 	PGSTAT_END_WRITE_ACTIVITY(beentry);
 }
 
+/*-----------
+ * pgstat_progress_parallel_incr_param() -
+ *
+ * A variant of pgstat_progress_incr_param to allow a worker to poke at
+ * a leader to do an incremental progress update.
+ *-----------
+ */
+void
+pgstat_progress_parallel_incr_param(int index, int64 incr)
+{
+	/*
+	 * Parallel workers notify a leader through a 'P' protocol message to
+	 * update progress, passing the progress index and incremented value.
+	 * Leaders can just call pgstat_progress_incr_param directly.
+	 */
+	if (IsParallelWorker())
+	{
+		static StringInfoData progress_message;
+
+		initStringInfo(&progress_message);
+
+		pq_beginmessage(&progress_message, 'P');
+		pq_sendint32(&progress_message, index);
+		pq_sendint64(&progress_message, incr);
+		pq_endmessage(&progress_message);
+	}
+	else
+		pgstat_progress_incr_param(index, incr);
+}
+
 /*-----------
  * pgstat_progress_update_multi_param() -
  *
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index e5add41352..2478e87425 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -25,6 +25,8 @@
 #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS		4
 #define PROGRESS_VACUUM_MAX_DEAD_TUPLES			5
 #define PROGRESS_VACUUM_NUM_DEAD_TUPLES			6
+#define PROGRESS_VACUUM_INDEXES_TOTAL			7
+#define PROGRESS_VACUUM_INDEXES_PROCESSED		8
 
 /* Phases of vacuum (as advertised via PROGRESS_VACUUM_PHASE) */
 #define PROGRESS_VACUUM_PHASE_SCAN_HEAP			1
diff --git a/src/include/utils/backend_progress.h b/src/include/utils/backend_progress.h
index a84752ade9..70dea55fc0 100644
--- a/src/include/utils/backend_progress.h
+++ b/src/include/utils/backend_progress.h
@@ -37,6 +37,7 @@ extern void pgstat_progress_start_command(ProgressCommandType cmdtype,
 										  Oid relid);
 extern void pgstat_progress_update_param(int index, int64 val);
 extern void pgstat_progress_incr_param(int index, int64 incr);
+extern void pgstat_progress_parallel_incr_param(int index, int64 incr);
 extern void pgstat_progress_update_multi_param(int nparam, const int *index,
 											   const int64 *val);
 extern void pgstat_progress_end_command(void);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 7fd81e6a7d..e07afcd4aa 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2044,7 +2044,9 @@ pg_stat_progress_vacuum| SELECT s.pid,
     s.param4 AS heap_blks_vacuumed,
     s.param5 AS index_vacuum_count,
     s.param6 AS max_dead_tuples,
-    s.param7 AS num_dead_tuples
+    s.param7 AS num_dead_tuples,
+    s.param8 AS indexes_total,
+    s.param9 AS indexes_processed
    FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_recovery_prefetch| SELECT stats_reset,
-- 
2.31.1

Reply via email to