From 3e146938fca29e2ace9bb86f60ac346f4312534e Mon Sep 17 00:00:00 2001
From: "Imseih (AWS)" <simseih@88665a22795f.ant.amazon.com>
Date: Fri, 17 Feb 2023 19:34:02 -0600
Subject: [PATCH 1/1] Report index vacuum progress.

Add 2 new columns to pg_stat_progress_vacuum.
The columns are ndexes_total as the total indexes to be vacuumed or cleaned and
indexes_processed as the number of indexes vacuumed or cleaned up so far.

Authors: Sami Imseih
Reviewed by: Masahiko Sawada, Nathan Bossart, Andres Freund
Discussion: https://www.postgresql.org/message-id/flat/5478DFCD-2333-401A-B2F0-0D186AB09228@amazon.com
---
 doc/src/sgml/monitoring.sgml          | 21 ++++++++
 src/backend/access/heap/vacuumlazy.c  | 70 +++++++++++++++++++++++----
 src/backend/access/transam/parallel.c |  9 ++++
 src/backend/catalog/system_views.sql  |  3 +-
 src/backend/commands/vacuumparallel.c | 45 ++++++++++++++++-
 src/include/access/parallel.h         |  3 ++
 src/include/commands/progress.h       |  2 +
 src/include/commands/vacuum.h         |  1 +
 src/test/regress/expected/rules.out   |  4 +-
 9 files changed, 146 insertions(+), 12 deletions(-)

diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml
index b0b997f092..02ebb718d7 100644
--- a/doc/src/sgml/monitoring.sgml
+++ b/doc/src/sgml/monitoring.sgml
@@ -7183,6 +7183,27 @@ FROM pg_stat_get_backend_idset() AS backendid;
        Number of dead tuples collected since the last index vacuum cycle.
       </para></entry>
      </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_total</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Total number of indexes that will be vacuumed or cleaned up. This number is
+       reported as of the beginning of the <literal>vacuuming indexes</literal> phase
+       or the <literal>cleaning up indexes</literal> phase.
+      </para></entry>
+     </row>
+
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>indexes_processed</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of indexes processed. This counter only advances when the phase is
+       <literal>vacuuming indexes</literal> or <literal>cleaning up indexes</literal>.
+      </para></entry>
+     </row>
     </tbody>
    </tgroup>
   </table>
diff --git a/src/backend/access/heap/vacuumlazy.c b/src/backend/access/heap/vacuumlazy.c
index 8f14cf85f3..443a44d6c9 100644
--- a/src/backend/access/heap/vacuumlazy.c
+++ b/src/backend/access/heap/vacuumlazy.c
@@ -2316,6 +2316,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 {
 	bool		allindexes = true;
 	double		old_live_tuples = vacrel->rel->rd_rel->reltuples;
+	const int progress_start_index[] = {
+		PROGRESS_VACUUM_PHASE,
+		PROGRESS_VACUUM_INDEX_TOTAL
+	};
+	const int progress_end_index[] = {
+		PROGRESS_VACUUM_INDEX_TOTAL,
+		PROGRESS_VACUUM_INDEX_PROCESSED,
+		PROGRESS_VACUUM_NUM_INDEX_VACUUMS
+	};
+	int64       progress_start_val[2];
+	int64       progress_end_val[3];
 
 	Assert(vacrel->nindexes > 0);
 	Assert(vacrel->do_index_vacuuming);
@@ -2328,9 +2339,13 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 		return false;
 	}
 
-	/* Report that we are now vacuuming indexes */
-	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-								 PROGRESS_VACUUM_PHASE_VACUUM_INDEX);
+	/*
+	 * Report that we are now vacuuming indexes and the number of indexes
+	 * to vacuum.
+	 */
+	progress_start_val[0] = PROGRESS_VACUUM_PHASE_VACUUM_INDEX;
+	progress_start_val[1] = vacrel->nindexes;
+	pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
@@ -2343,6 +2358,10 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 														  old_live_tuples,
 														  vacrel);
 
+			/* Report the number of indexes vacuumed */
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_PROCESSED,
+										 idx + 1);
+
 			if (lazy_check_wraparound_failsafe(vacrel))
 			{
 				/* Wraparound emergency -- end current index scan */
@@ -2377,14 +2396,17 @@ lazy_vacuum_all_indexes(LVRelState *vacrel)
 	Assert(allindexes || vacrel->failsafe_active);
 
 	/*
-	 * Increase and report the number of index scans.
+	 * Increase and report the number of index scans. Also, we reset the progress
+	 * counters.
 	 *
 	 * We deliberately include the case where we started a round of bulk
 	 * deletes that we weren't able to finish due to the failsafe triggering.
 	 */
 	vacrel->num_index_scans++;
-	pgstat_progress_update_param(PROGRESS_VACUUM_NUM_INDEX_VACUUMS,
-								 vacrel->num_index_scans);
+	progress_end_val[0] = 0;
+	progress_end_val[1] = 0;
+	progress_end_val[2] = vacrel->num_index_scans;
+	pgstat_progress_update_multi_param(3, progress_end_index, progress_end_val);
 
 	return allindexes;
 }
@@ -2621,6 +2643,12 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 
 	if (unlikely(vacuum_xid_failsafe_check(&vacrel->cutoffs)))
 	{
+		const int   progress_index[] = {
+			PROGRESS_VACUUM_INDEX_TOTAL,
+			PROGRESS_VACUUM_INDEX_PROCESSED
+		};
+		int64       progress_val[2] = {0, 0};
+
 		vacrel->failsafe_active = true;
 
 		/* Disable index vacuuming, index cleanup, and heap rel truncation */
@@ -2628,6 +2656,9 @@ lazy_check_wraparound_failsafe(LVRelState *vacrel)
 		vacrel->do_index_cleanup = false;
 		vacrel->do_rel_truncate = false;
 
+		/* Reset the progress counters */
+		pgstat_progress_update_multi_param(2, progress_index, progress_val);
+
 		ereport(WARNING,
 				(errmsg("bypassing nonessential maintenance of table \"%s.%s.%s\" as a failsafe after %d index scans",
 						vacrel->dbname, vacrel->relnamespace, vacrel->relname,
@@ -2654,13 +2685,27 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 {
 	double		reltuples = vacrel->new_rel_tuples;
 	bool		estimated_count = vacrel->scanned_pages < vacrel->rel_pages;
+	const int progress_start_index[] = {
+		PROGRESS_VACUUM_PHASE,
+		PROGRESS_VACUUM_INDEX_TOTAL
+	};
+	const int progress_end_index[] = {
+		PROGRESS_VACUUM_INDEX_TOTAL,
+		PROGRESS_VACUUM_INDEX_PROCESSED
+	};
+	int64       progress_start_val[2];
+	int64       progress_end_val[2] = {0, 0};
 
 	Assert(vacrel->do_index_cleanup);
 	Assert(vacrel->nindexes > 0);
 
-	/* Report that we are now cleaning up indexes */
-	pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
-								 PROGRESS_VACUUM_PHASE_INDEX_CLEANUP);
+	/*
+	 * Report that we are now cleaning up indexes and the number of indexes
+	 * to cleanup.
+	 */
+	progress_start_val[0] = PROGRESS_VACUUM_PHASE_INDEX_CLEANUP;
+	progress_start_val[1] = vacrel->nindexes;
+	pgstat_progress_update_multi_param(2, progress_start_index, progress_start_val);
 
 	if (!ParallelVacuumIsActive(vacrel))
 	{
@@ -2672,6 +2717,10 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 			vacrel->indstats[idx] =
 				lazy_cleanup_one_index(indrel, istat, reltuples,
 									   estimated_count, vacrel);
+
+			/* Report the number of indexes cleaned up */
+			pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_PROCESSED,
+										 idx + 1);
 		}
 	}
 	else
@@ -2681,6 +2730,9 @@ lazy_cleanup_all_indexes(LVRelState *vacrel)
 											vacrel->num_index_scans,
 											estimated_count);
 	}
+
+	/* Reset the progress counters */
+	pgstat_progress_update_multi_param(2, progress_end_index, progress_end_val);
 }
 
 /*
diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c
index b26f2a64fb..b0c406fe7a 100644
--- a/src/backend/access/transam/parallel.c
+++ b/src/backend/access/transam/parallel.c
@@ -1199,6 +1199,15 @@ HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
 				break;
 			}
 
+		case 'P':				/* Parallel progress reporting */
+			{
+				/* Call the progress reporting callback */
+				Assert(pcxt->parallel_progress_callback);
+				pcxt->parallel_progress_callback(pcxt->parallel_progress_callback_arg);
+
+				break;
+			}
+
 		case 'X':				/* Terminate, indicating clean exit */
 			{
 				shm_mq_detach(pcxt->worker[i].error_mqh);
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 34ca0e739f..5886ee8b7c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1180,7 +1180,8 @@ CREATE VIEW pg_stat_progress_vacuum AS
                       END AS phase,
         S.param2 AS heap_blks_total, S.param3 AS heap_blks_scanned,
         S.param4 AS heap_blks_vacuumed, S.param5 AS index_vacuum_count,
-        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples
+        S.param6 AS max_dead_tuples, S.param7 AS num_dead_tuples,
+        S.param8 AS indexes_total, S.param9 AS indexes_processed
     FROM pg_stat_get_progress_info('VACUUM') AS S
         LEFT JOIN pg_database D ON S.datid = D.oid;
 
diff --git a/src/backend/commands/vacuumparallel.c b/src/backend/commands/vacuumparallel.c
index bcd40c80a1..57dc5fd8f0 100644
--- a/src/backend/commands/vacuumparallel.c
+++ b/src/backend/commands/vacuumparallel.c
@@ -30,7 +30,9 @@
 #include "access/table.h"
 #include "access/xact.h"
 #include "catalog/index.h"
+#include "commands/progress.h"
 #include "commands/vacuum.h"
+#include "libpq/libpq.h"
 #include "optimizer/paths.h"
 #include "pgstat.h"
 #include "storage/bufmgr.h"
@@ -103,6 +105,14 @@ typedef struct PVShared
 
 	/* Counter for vacuuming and cleanup */
 	pg_atomic_uint32 idx;
+
+	/*
+	 * Number of indexes processed in a parallel index bulk-deletion or a
+	 * parallel index cleanup. This counter is used to report the progress
+	 * information.
+	 */
+	pg_atomic_uint32 nindexes_processed;
+
 } PVShared;
 
 /* Status used during parallel index vacuum or cleanup */
@@ -271,6 +281,11 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	pcxt = CreateParallelContext("postgres", "parallel_vacuum_main",
 								 parallel_workers);
 	Assert(pcxt->nworkers > 0);
+
+	/* Setup callback for updating the progress information */
+	pcxt->parallel_progress_callback = parallel_vacuum_update_progress;
+	pcxt->parallel_progress_callback_arg = pvs;
+
 	pvs->pcxt = pcxt;
 
 	/* Estimate size for index vacuum stats -- PARALLEL_VACUUM_KEY_INDEX_STATS */
@@ -364,6 +379,7 @@ parallel_vacuum_init(Relation rel, Relation *indrels, int nindexes,
 	pg_atomic_init_u32(&(shared->cost_balance), 0);
 	pg_atomic_init_u32(&(shared->active_nworkers), 0);
 	pg_atomic_init_u32(&(shared->idx), 0);
+	pg_atomic_init_u32(&(shared->nindexes_processed), 0);
 
 	shm_toc_insert(pcxt->toc, PARALLEL_VACUUM_KEY_SHARED, shared);
 	pvs->shared = shared;
@@ -618,8 +634,9 @@ parallel_vacuum_process_all_indexes(ParallelVacuumState *pvs, int num_index_scan
 													vacuum));
 	}
 
-	/* Reset the parallel index processing counter */
+	/* Reset the parallel index processing and progress counters */
 	pg_atomic_write_u32(&(pvs->shared->idx), 0);
+	pg_atomic_write_u32(&(pvs->shared->nindexes_processed), 0);
 
 	/* Setup the shared cost-based vacuum delay and launch workers */
 	if (nworkers > 0)
@@ -888,6 +905,16 @@ parallel_vacuum_process_one_index(ParallelVacuumState *pvs, Relation indrel,
 	pvs->status = PARALLEL_INDVAC_STATUS_COMPLETED;
 	pfree(pvs->indname);
 	pvs->indname = NULL;
+
+	/*
+	 * Update the index vacuum progress information. Since the progress is
+	 * updated only by the leader, the worker notifies the leader of it.
+	 */
+	pg_atomic_add_fetch_u32(&(pvs->shared->nindexes_processed), 1);
+	if (IsParallelWorker())
+		pq_putmessage('P', NULL, 0);
+	else
+		parallel_vacuum_update_progress(pvs);
 }
 
 /*
@@ -1072,3 +1099,19 @@ parallel_vacuum_error_callback(void *arg)
 			return;
 	}
 }
+
+/*
+ * Update the number of indexes processed so far in the current index bulk-deletion
+ * or index cleanup.
+ */
+void
+parallel_vacuum_update_progress(void *arg)
+{
+	ParallelVacuumState *pvs = (ParallelVacuumState *) arg;
+
+	Assert(!IsParallelWorker());
+	Assert(pvs);
+
+	pgstat_progress_update_param(PROGRESS_VACUUM_INDEX_PROCESSED,
+								 pg_atomic_read_u32(&(pvs->shared->nindexes_processed)));
+}
diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h
index 061f8a4c4c..f621d51c0d 100644
--- a/src/include/access/parallel.h
+++ b/src/include/access/parallel.h
@@ -21,6 +21,7 @@
 #include "storage/shm_toc.h"
 
 typedef void (*parallel_worker_main_type) (dsm_segment *seg, shm_toc *toc);
+typedef void (*parallel_progress_callback_type) (void *arg);
 
 typedef struct ParallelWorkerInfo
 {
@@ -46,6 +47,8 @@ typedef struct ParallelContext
 	ParallelWorkerInfo *worker;
 	int			nknown_attached_workers;
 	bool	   *known_attached_workers;
+	parallel_progress_callback_type parallel_progress_callback;
+	void		*parallel_progress_callback_arg;
 } ParallelContext;
 
 typedef struct ParallelWorkerContext
diff --git a/src/include/commands/progress.h b/src/include/commands/progress.h
index e5add41352..23c38f2d0e 100644
--- a/src/include/commands/progress.h
+++ b/src/include/commands/progress.h
@@ -25,6 +25,8 @@
 #define PROGRESS_VACUUM_NUM_INDEX_VACUUMS		4
 #define PROGRESS_VACUUM_MAX_DEAD_TUPLES			5
 #define PROGRESS_VACUUM_NUM_DEAD_TUPLES			6
+#define PROGRESS_VACUUM_INDEX_TOTAL             7
+#define PROGRESS_VACUUM_INDEX_PROCESSED         8
 
 /* Phases of vacuum (as advertised via PROGRESS_VACUUM_PHASE) */
 #define PROGRESS_VACUUM_PHASE_SCAN_HEAP			1
diff --git a/src/include/commands/vacuum.h b/src/include/commands/vacuum.h
index 689dbb7702..7b13069d33 100644
--- a/src/include/commands/vacuum.h
+++ b/src/include/commands/vacuum.h
@@ -370,5 +370,6 @@ extern bool std_typanalyze(VacAttrStats *stats);
 extern double anl_random_fract(void);
 extern double anl_init_selection_state(int n);
 extern double anl_get_next_S(double t, int n, double *stateptr);
+extern void parallel_vacuum_update_progress(void *arg);
 
 #endif							/* VACUUM_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index a3a5a62329..b1badc485d 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -2033,7 +2033,9 @@ pg_stat_progress_vacuum| SELECT s.pid,
     s.param4 AS heap_blks_vacuumed,
     s.param5 AS index_vacuum_count,
     s.param6 AS max_dead_tuples,
-    s.param7 AS num_dead_tuples
+    s.param7 AS num_dead_tuples,
+    s.param8 AS indexes_total,
+    s.param9 AS indexes_processed
    FROM (pg_stat_get_progress_info('VACUUM'::text) s(pid, datid, relid, param1, param2, param3, param4, param5, param6, param7, param8, param9, param10, param11, param12, param13, param14, param15, param16, param17, param18, param19, param20)
      LEFT JOIN pg_database d ON ((s.datid = d.oid)));
 pg_stat_recovery_prefetch| SELECT stats_reset,
-- 
2.37.1 (Apple Git-137.1)

