Hello, Matthias and others!

Realized new horizon was applied only during validation phase (once index
is marked as ready).
Now it applied if index is not marked as valid yet.

Updated version in attach.

--------------------------------------------------

> I think the best way for this to work would be an index method that
> exclusively stores TIDs, and of which we can quickly determine new
> tuples, too. I was thinking about something like GIN's format, but
> using (generation number, tid) instead of ([colno, colvalue], tid) as
> key data for the internal trees, and would be unlogged (because the
> data wouldn't have to survive a crash). Then we could do something
> like this for the second table scan phase:

Regarding that approach to dealing with validation phase and resetting of
snapshot:

I was thinking about it and realized: once we go for an additional index -
we don't need the second heap scan at all!

We may do it this way:

* create target index, not marked as indisready yet
* create a temporary unlogged index with the same parameters to store tids
(optionally with the indexes columns data, see below), marked as indisready
(but not indisvalid)
* commit them both in a single transaction
* wait for other transaction to know about them and honor in HOT
constraints and new inserts (for temporary index)
* now our temporary index is filled by the tuples inserted to the table
* start building out target index, resetting snapshot every so often (if it
is "safe" index)
* finish target index building phase
* mark target index as indisready
* now, start validation of the index:
    * take the reference snapshot
    * take a visibility snapshot of the target index, sort it (as it done
currently)
    * take a visibility snapshot of our temporary index, sort it
    * start merging loop using two synchronized cursors over both
visibility snapshots
        * if we encountered tid which is not present in target visibility
snapshot
            * insert it to target index
                * if a temporary index contains the column's data - we may
even avoid the tuple fetch
                * if temporary index is tid-only - we fetch tuple from the
heap, but as plus we are also skipping dead tuples from insertion to the
new index (I think it is better option)
    * commit everything, release reference snapshot
* wait for transactions older than reference snapshot (as it done currently)
* mark target index as indisvalid, drop temporary index
* done


So, pros:
* just a single heap scan
* snapshot is reset periodically

Cons:
* we need to maintain the additional index during the main building phase
* one more tuplesort

If the temporary index is unlogged, cheap to maintain (just append-only
mechanics) this feels like a perfect tradeoff for me.

This approach will work perfectly with low amount of tuple inserts during
the building phase. And looks like even in the worst case it still better
than the current approach.

What do you think? Have I missed something?

Thanks,
Michail.
From 4878cc22c9176e5bf2b7d3d9d8c95cc66c8ac007 Mon Sep 17 00:00:00 2001
From: nkey <michail.nikol...@gmail.com>
Date: Wed, 8 May 2024 22:31:33 +0200
Subject: [PATCH v4] WIP: fix d9d076222f5b "VACUUM: ignore indexing operations 
 with CONCURRENTLY" which was reverted by e28bb8851969.

Issue was caused by absent of any snapshot actually protects the data in relation in the required to build index correctly.

Introduce new type of visibility horizon to be used for relation with concurrently build indexes (in the case of "safe" index).

Now `GlobalVisHorizonKindForRel` may dynamically decide which horizon to used base of the data about safe indexes being built concurrently.

To reduce performance impact counter of concurrently built indexes updated in shared memory.
---
 src/backend/catalog/index.c              |  36 ++++++
 src/backend/commands/indexcmds.c         |  20 +++
 src/backend/storage/ipc/ipci.c           |   2 +
 src/backend/storage/ipc/procarray.c      |  85 ++++++++++++-
 src/backend/utils/cache/relcache.c       |  11 ++
 src/bin/pg_amcheck/t/006_concurrently.pl | 155 +++++++++++++++++++++++
 src/include/catalog/index.h              |   5 +
 src/include/utils/rel.h                  |   1 +
 8 files changed, 309 insertions(+), 6 deletions(-)
 create mode 100644 src/bin/pg_amcheck/t/006_concurrently.pl

diff --git a/src/backend/catalog/index.c b/src/backend/catalog/index.c
index 5a8568c55c..3caa2bab12 100644
--- a/src/backend/catalog/index.c
+++ b/src/backend/catalog/index.c
@@ -97,6 +97,11 @@ typedef struct
 	Oid			pendingReindexedIndexes[FLEXIBLE_ARRAY_MEMBER];
 } SerializedReindexState;
 
+typedef struct {
+	pg_atomic_uint32 numSafeConcurrentlyBuiltIndexes;
+} SafeICSharedState;
+static SafeICSharedState *SafeICStateShmem;
+
 /* non-export function prototypes */
 static bool relationHasPrimaryKey(Relation rel);
 static TupleDesc ConstructTupleDescriptor(Relation heapRelation,
@@ -176,6 +181,37 @@ relationHasPrimaryKey(Relation rel)
 	return result;
 }
 
+
+void SafeICStateShmemInit(void)
+{
+	bool		found;
+
+	SafeICStateShmem = (SafeICSharedState *)
+			ShmemInitStruct("Safe Concurrently Build Indexes",
+							sizeof(SafeICSharedState),
+							&found);
+
+	if (!IsUnderPostmaster)
+	{
+		Assert(!found);
+		pg_atomic_init_u32(&SafeICStateShmem->numSafeConcurrentlyBuiltIndexes, 0);
+	} else
+		Assert(found);
+}
+
+void UpdateNumSafeConcurrentlyBuiltIndexes(bool increment)
+{
+	if (increment)
+		pg_atomic_fetch_add_u32(&SafeICStateShmem->numSafeConcurrentlyBuiltIndexes, 1);
+	else
+		pg_atomic_fetch_sub_u32(&SafeICStateShmem->numSafeConcurrentlyBuiltIndexes, 1);
+}
+
+bool IsAnySafeIndexBuildsConcurrently()
+{
+	return pg_atomic_read_u32(&SafeICStateShmem->numSafeConcurrentlyBuiltIndexes) > 0;
+}
+
 /*
  * index_check_primary_key
  *		Apply special checks needed before creating a PRIMARY KEY index
diff --git a/src/backend/commands/indexcmds.c b/src/backend/commands/indexcmds.c
index d9016ef487..663450ba20 100644
--- a/src/backend/commands/indexcmds.c
+++ b/src/backend/commands/indexcmds.c
@@ -1636,6 +1636,8 @@ DefineIndex(Oid tableId,
 	 * hold lock on the parent table.  This might need to change later.
 	 */
 	LockRelationIdForSession(&heaprelid, ShareUpdateExclusiveLock);
+	if (safe_index && concurrent)
+		UpdateNumSafeConcurrentlyBuiltIndexes(true);
 
 	PopActiveSnapshot();
 	CommitTransactionCommand();
@@ -1804,7 +1806,15 @@ DefineIndex(Oid tableId,
 	 * to replan; so relcache flush on the index itself was sufficient.)
 	 */
 	CacheInvalidateRelcacheByRelid(heaprelid.relId);
+	/* Commit index as valid before reducing counter of safe concurrently build indexes */
+	CommitTransactionCommand();
 
+	Assert(concurrent);
+	if (safe_index)
+		UpdateNumSafeConcurrentlyBuiltIndexes(false);
+
+	/* Start a new transaction to finish process properly */
+	StartTransactionCommand();
 	/*
 	 * Last thing to do is release the session-level lock on the parent table.
 	 */
@@ -3902,6 +3912,8 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein
 					 indexRel->rd_indpred == NIL);
 		idx->tableId = RelationGetRelid(heapRel);
 		idx->amId = indexRel->rd_rel->relam;
+		if (idx->safe)
+			UpdateNumSafeConcurrentlyBuiltIndexes(true);
 
 		/* This function shouldn't be called for temporary relations. */
 		if (indexRel->rd_rel->relpersistence == RELPERSISTENCE_TEMP)
@@ -4345,6 +4357,14 @@ ReindexRelationConcurrently(const ReindexStmt *stmt, Oid relationOid, const Rein
 		UnlockRelationIdForSession(lockrelid, ShareUpdateExclusiveLock);
 	}
 
+	// now we may clear safe index building flags
+	foreach(lc, newIndexIds)
+	{
+		ReindexIndexInfo *newidx = lfirst(lc);
+		if (newidx->safe)
+			UpdateNumSafeConcurrentlyBuiltIndexes(false);
+	}
+
 	/* Start a new transaction to finish process properly */
 	StartTransactionCommand();
 
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 521ed5418c..260a634f1b 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -24,6 +24,7 @@
 #include "access/twophase.h"
 #include "access/xlogprefetcher.h"
 #include "access/xlogrecovery.h"
+#include "catalog/index.h"
 #include "commands/async.h"
 #include "miscadmin.h"
 #include "pgstat.h"
@@ -357,6 +358,7 @@ CreateOrAttachShmemStructs(void)
 	StatsShmemInit();
 	WaitEventExtensionShmemInit();
 	InjectionPointShmemInit();
+	SafeICStateShmemInit();
 }
 
 /*
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 1a83c4220b..446df34dab 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -53,6 +53,7 @@
 #include "access/xact.h"
 #include "access/xlogutils.h"
 #include "catalog/catalog.h"
+#include "catalog/index.h"
 #include "catalog/pg_authid.h"
 #include "commands/dbcommands.h"
 #include "miscadmin.h"
@@ -236,6 +237,12 @@ typedef struct ComputeXidHorizonsResult
 	 */
 	TransactionId data_oldest_nonremovable;
 
+	/*
+	 * Oldest xid for which deleted tuples need to be retained in normal user
+	 * defined tables with index building in progress by process with PROC_INSAFE_IC.
+	 */
+	TransactionId data_safe_ic_oldest_nonremovable;
+
 	/*
 	 * Oldest xid for which deleted tuples need to be retained in this
 	 * session's temporary tables.
@@ -251,6 +258,7 @@ typedef enum GlobalVisHorizonKind
 	VISHORIZON_SHARED,
 	VISHORIZON_CATALOG,
 	VISHORIZON_DATA,
+	VISHORIZON_DATA_SAFE_IC,
 	VISHORIZON_TEMP,
 } GlobalVisHorizonKind;
 
@@ -297,6 +305,7 @@ static TransactionId standbySnapshotPendingXmin;
 static GlobalVisState GlobalVisSharedRels;
 static GlobalVisState GlobalVisCatalogRels;
 static GlobalVisState GlobalVisDataRels;
+static GlobalVisState GlobalVisDataSafeIcRels;
 static GlobalVisState GlobalVisTempRels;
 
 /*
@@ -1727,9 +1736,6 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 	bool		in_recovery = RecoveryInProgress();
 	TransactionId *other_xids = ProcGlobal->xids;
 
-	/* inferred after ProcArrayLock is released */
-	h->catalog_oldest_nonremovable = InvalidTransactionId;
-
 	LWLockAcquire(ProcArrayLock, LW_SHARED);
 
 	h->latest_completed = TransamVariables->latestCompletedXid;
@@ -1749,7 +1755,9 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 
 		h->oldest_considered_running = initial;
 		h->shared_oldest_nonremovable = initial;
+		h->catalog_oldest_nonremovable = initial;
 		h->data_oldest_nonremovable = initial;
+		h->data_safe_ic_oldest_nonremovable = initial;
 
 		/*
 		 * Only modifications made by this backend affect the horizon for
@@ -1847,11 +1855,28 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 			(statusFlags & PROC_AFFECTS_ALL_HORIZONS) ||
 			in_recovery)
 		{
-			h->data_oldest_nonremovable =
-				TransactionIdOlder(h->data_oldest_nonremovable, xmin);
+			h->data_safe_ic_oldest_nonremovable =
+					TransactionIdOlder(h->data_safe_ic_oldest_nonremovable, xmin);
+
+			if (!(statusFlags & PROC_IN_SAFE_IC))
+				h->data_oldest_nonremovable =
+					TransactionIdOlder(h->data_oldest_nonremovable, xmin);
+
+			/* Catalog tables need to consider all backends in this db */
+			h->catalog_oldest_nonremovable =
+				TransactionIdOlder(h->catalog_oldest_nonremovable, xmin);
+
 		}
 	}
 
+	/* catalog horizon should never be later than data */
+	Assert(TransactionIdPrecedesOrEquals(h->catalog_oldest_nonremovable,
+										 h->data_oldest_nonremovable));
+
+	/* data horizon should never be later than safe index building horizon */
+	Assert(TransactionIdPrecedesOrEquals(h->data_safe_ic_oldest_nonremovable,
+										 h->data_oldest_nonremovable));
+
 	/*
 	 * If in recovery fetch oldest xid in KnownAssignedXids, will be applied
 	 * after lock is released.
@@ -1873,6 +1898,10 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 			TransactionIdOlder(h->shared_oldest_nonremovable, kaxmin);
 		h->data_oldest_nonremovable =
 			TransactionIdOlder(h->data_oldest_nonremovable, kaxmin);
+		h->data_safe_ic_oldest_nonremovable =
+				TransactionIdOlder(h->data_safe_ic_oldest_nonremovable, kaxmin);
+		h->catalog_oldest_nonremovable =
+			TransactionIdOlder(h->catalog_oldest_nonremovable, kaxmin);
 		/* temp relations cannot be accessed in recovery */
 	}
 
@@ -1880,6 +1909,8 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 										 h->shared_oldest_nonremovable));
 	Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable,
 										 h->data_oldest_nonremovable));
+	Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable,
+										 h->data_safe_ic_oldest_nonremovable));
 
 	/*
 	 * Check whether there are replication slots requiring an older xmin.
@@ -1888,6 +1919,8 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 		TransactionIdOlder(h->shared_oldest_nonremovable, h->slot_xmin);
 	h->data_oldest_nonremovable =
 		TransactionIdOlder(h->data_oldest_nonremovable, h->slot_xmin);
+	h->data_safe_ic_oldest_nonremovable =
+			TransactionIdOlder(h->data_safe_ic_oldest_nonremovable, h->slot_xmin);
 
 	/*
 	 * The only difference between catalog / data horizons is that the slot's
@@ -1900,7 +1933,9 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 	h->shared_oldest_nonremovable =
 		TransactionIdOlder(h->shared_oldest_nonremovable,
 						   h->slot_catalog_xmin);
-	h->catalog_oldest_nonremovable = h->data_oldest_nonremovable;
+	h->catalog_oldest_nonremovable =
+		TransactionIdOlder(h->catalog_oldest_nonremovable,
+						   h->slot_xmin);
 	h->catalog_oldest_nonremovable =
 		TransactionIdOlder(h->catalog_oldest_nonremovable,
 						   h->slot_catalog_xmin);
@@ -1918,6 +1953,9 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 	h->oldest_considered_running =
 		TransactionIdOlder(h->oldest_considered_running,
 						   h->data_oldest_nonremovable);
+	h->oldest_considered_running =
+			TransactionIdOlder(h->oldest_considered_running,
+							   h->data_safe_ic_oldest_nonremovable);
 
 	/*
 	 * shared horizons have to be at least as old as the oldest visible in
@@ -1925,6 +1963,8 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 	 */
 	Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable,
 										 h->data_oldest_nonremovable));
+	Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable,
+										 h->data_safe_ic_oldest_nonremovable));
 	Assert(TransactionIdPrecedesOrEquals(h->shared_oldest_nonremovable,
 										 h->catalog_oldest_nonremovable));
 
@@ -1938,6 +1978,8 @@ ComputeXidHorizons(ComputeXidHorizonsResult *h)
 										 h->catalog_oldest_nonremovable));
 	Assert(TransactionIdPrecedesOrEquals(h->oldest_considered_running,
 										 h->data_oldest_nonremovable));
+	Assert(TransactionIdPrecedesOrEquals(h->oldest_considered_running,
+										 h->data_safe_ic_oldest_nonremovable));
 	Assert(TransactionIdPrecedesOrEquals(h->oldest_considered_running,
 										 h->temp_oldest_nonremovable));
 	Assert(!TransactionIdIsValid(h->slot_xmin) ||
@@ -1973,7 +2015,21 @@ GlobalVisHorizonKindForRel(Relation rel)
 			 RelationIsAccessibleInLogicalDecoding(rel))
 		return VISHORIZON_CATALOG;
 	else if (!RELATION_IS_LOCAL(rel))
+	{
+		// TODO: do we need to do something special about the TOAST?
+		if (!rel->rd_indexvalid)
+		{
+			// skip loading indexes if we know there is not safe concurrent index builds in the cluster
+			if (IsAnySafeIndexBuildsConcurrently())
+			{
+				RelationGetIndexList(rel);
+				Assert(rel->rd_indexvalid);
+			} else return VISHORIZON_DATA;
+		}
+		if (rel->rd_safeindexconcurrentlybuilding)
+			return VISHORIZON_DATA_SAFE_IC;
 		return VISHORIZON_DATA;
+	}
 	else
 		return VISHORIZON_TEMP;
 }
@@ -2004,6 +2060,8 @@ GetOldestNonRemovableTransactionId(Relation rel)
 			return horizons.catalog_oldest_nonremovable;
 		case VISHORIZON_DATA:
 			return horizons.data_oldest_nonremovable;
+		case VISHORIZON_DATA_SAFE_IC:
+			return horizons.data_safe_ic_oldest_nonremovable;
 		case VISHORIZON_TEMP:
 			return horizons.temp_oldest_nonremovable;
 	}
@@ -2454,6 +2512,9 @@ GetSnapshotData(Snapshot snapshot)
 		GlobalVisDataRels.definitely_needed =
 			FullTransactionIdNewer(def_vis_fxid_data,
 								   GlobalVisDataRels.definitely_needed);
+		GlobalVisDataSafeIcRels.definitely_needed =
+				FullTransactionIdNewer(def_vis_fxid_data,
+									   GlobalVisDataSafeIcRels.definitely_needed);
 		/* See temp_oldest_nonremovable computation in ComputeXidHorizons() */
 		if (TransactionIdIsNormal(myxid))
 			GlobalVisTempRels.definitely_needed =
@@ -2478,6 +2539,9 @@ GetSnapshotData(Snapshot snapshot)
 		GlobalVisCatalogRels.maybe_needed =
 			FullTransactionIdNewer(GlobalVisCatalogRels.maybe_needed,
 								   oldestfxid);
+		GlobalVisDataSafeIcRels.maybe_needed =
+				FullTransactionIdNewer(GlobalVisDataSafeIcRels.maybe_needed,
+									   oldestfxid);
 		GlobalVisDataRels.maybe_needed =
 			FullTransactionIdNewer(GlobalVisDataRels.maybe_needed,
 								   oldestfxid);
@@ -4106,6 +4170,9 @@ GlobalVisTestFor(Relation rel)
 		case VISHORIZON_DATA:
 			state = &GlobalVisDataRels;
 			break;
+		case VISHORIZON_DATA_SAFE_IC:
+			state = &GlobalVisDataSafeIcRels;
+			break;
 		case VISHORIZON_TEMP:
 			state = &GlobalVisTempRels;
 			break;
@@ -4158,6 +4225,9 @@ GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons)
 	GlobalVisDataRels.maybe_needed =
 		FullXidRelativeTo(horizons->latest_completed,
 						  horizons->data_oldest_nonremovable);
+	GlobalVisDataSafeIcRels.maybe_needed =
+			FullXidRelativeTo(horizons->latest_completed,
+							  horizons->data_safe_ic_oldest_nonremovable);
 	GlobalVisTempRels.maybe_needed =
 		FullXidRelativeTo(horizons->latest_completed,
 						  horizons->temp_oldest_nonremovable);
@@ -4176,6 +4246,9 @@ GlobalVisUpdateApply(ComputeXidHorizonsResult *horizons)
 	GlobalVisDataRels.definitely_needed =
 		FullTransactionIdNewer(GlobalVisDataRels.maybe_needed,
 							   GlobalVisDataRels.definitely_needed);
+	GlobalVisDataSafeIcRels.definitely_needed =
+			FullTransactionIdNewer(GlobalVisDataSafeIcRels.maybe_needed,
+								   GlobalVisDataSafeIcRels.definitely_needed);
 	GlobalVisTempRels.definitely_needed = GlobalVisTempRels.maybe_needed;
 
 	ComputeXidHorizonsResultLastXmin = RecentXmin;
diff --git a/src/backend/utils/cache/relcache.c b/src/backend/utils/cache/relcache.c
index 262c9878dd..93b7794b48 100644
--- a/src/backend/utils/cache/relcache.c
+++ b/src/backend/utils/cache/relcache.c
@@ -41,6 +41,7 @@
 #include "access/xact.h"
 #include "catalog/binary_upgrade.h"
 #include "catalog/catalog.h"
+#include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "catalog/partition.h"
@@ -4769,6 +4770,7 @@ RelationGetIndexList(Relation relation)
 	Oid			pkeyIndex = InvalidOid;
 	Oid			candidateIndex = InvalidOid;
 	bool		pkdeferrable = false;
+	bool 		safeindexconcurrentlybuilding = false;
 	MemoryContext oldcxt;
 
 	/* Quick exit if we already computed the list. */
@@ -4809,6 +4811,14 @@ RelationGetIndexList(Relation relation)
 		/* add index's OID to result list */
 		result = lappend_oid(result, index->indexrelid);
 
+		/*
+		 * Consider index as building if it is not yet valid.
+		 * Also, we must deal only with indexes which are built using the
+		 * concurrent safe mode.
+		 */
+		if (!index->indisvalid)
+			safeindexconcurrentlybuilding |= IsAnySafeIndexBuildsConcurrently();
+
 		/*
 		 * Non-unique or predicate indexes aren't interesting for either oid
 		 * indexes or replication identity indexes, so don't check them.
@@ -4869,6 +4879,7 @@ RelationGetIndexList(Relation relation)
 	relation->rd_indexlist = list_copy(result);
 	relation->rd_pkindex = pkeyIndex;
 	relation->rd_ispkdeferrable = pkdeferrable;
+	relation->rd_safeindexconcurrentlybuilding = safeindexconcurrentlybuilding;
 	if (replident == REPLICA_IDENTITY_DEFAULT && OidIsValid(pkeyIndex) && !pkdeferrable)
 		relation->rd_replidindex = pkeyIndex;
 	else if (replident == REPLICA_IDENTITY_INDEX && OidIsValid(candidateIndex))
diff --git a/src/bin/pg_amcheck/t/006_concurrently.pl b/src/bin/pg_amcheck/t/006_concurrently.pl
new file mode 100644
index 0000000000..7b8afeead5
--- /dev/null
+++ b/src/bin/pg_amcheck/t/006_concurrently.pl
@@ -0,0 +1,155 @@
+
+# Copyright (c) 2024, PostgreSQL Global Development Group
+
+# Test REINDEX CONCURRENTLY with concurrent modifications and HOT updates
+use strict;
+use warnings;
+
+use Config;
+use Errno;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Time::HiRes qw(usleep);
+use IPC::SysV;
+use threads;
+use Test::More;
+use Test::Builder;
+
+if ($@ || $windows_os)
+{
+	plan skip_all => 'Fork and shared memory are not supported by this platform';
+}
+
+# TODO: refactor to https://metacpan.org/pod/IPC%3A%3AShareable
+my ($pid, $shmem_id, $shmem_key,  $shmem_size);
+eval 'sub IPC_CREAT {0001000}' unless defined &IPC_CREAT;
+$shmem_size = 4;
+$shmem_key = rand(1000000);
+$shmem_id = shmget($shmem_key, $shmem_size, &IPC_CREAT | 0777) or die "Can't shmget: $!";
+shmwrite($shmem_id, "wait", 0, $shmem_size) or die "Can't shmwrite: $!";
+
+my $psql_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+#
+# Test set-up
+#
+my ($node, $result);
+$node = PostgreSQL::Test::Cluster->new('RC_test');
+$node->init;
+$node->append_conf('postgresql.conf',
+	'lock_timeout = ' . (1000 * $PostgreSQL::Test::Utils::timeout_default));
+$node->append_conf('postgresql.conf', 'fsync = off');
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
+$node->safe_psql('postgres', q(CREATE TABLE tbl(i int primary key,
+								c1 money default 0,c2 money default 0,
+								c3 money default 0, updated_at timestamp)));
+$node->safe_psql('postgres', q(CREATE INDEX idx ON tbl(i)));
+
+my $builder = Test::More->builder;
+$builder->use_numbers(0);
+$builder->no_plan();
+
+my $child  = $builder->child("pg_bench");
+
+if(!defined($pid = fork())) {
+	# fork returned undef, so unsuccessful
+	die "Cannot fork a child: $!";
+} elsif ($pid == 0) {
+
+	$node->pgbench(
+		'--no-vacuum --client=5 --transactions=25000',
+		0,
+		[qr{actually processed}],
+		[qr{^$}],
+		'concurrent INSERTs, UPDATES and RC',
+		{
+			'002_pgbench_concurrent_transaction_inserts' => q(
+				BEGIN;
+				INSERT INTO tbl VALUES(random()*10000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+				INSERT INTO tbl VALUES(random()*10000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+				INSERT INTO tbl VALUES(random()*10000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+				INSERT INTO tbl VALUES(random()*10000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+
+				INSERT INTO tbl VALUES(random()*10000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+				COMMIT;
+			  ),
+			# Ensure some HOT updates happen
+			'002_pgbench_concurrent_transaction_updates' => q(
+				BEGIN;
+				INSERT INTO tbl VALUES(random()*1000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+				INSERT INTO tbl VALUES(random()*1000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+				INSERT INTO tbl VALUES(random()*1000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+				INSERT INTO tbl VALUES(random()*1000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+
+				INSERT INTO tbl VALUES(random()*1000,0,0,0,now())
+					on conflict(i) do update set updated_at = now();
+				COMMIT;
+			  )
+		});
+
+	if ($child->is_passing()) {
+		shmwrite($shmem_id, "done", 0, $shmem_size) or die "Can't shmwrite: $!";
+	} else {
+		shmwrite($shmem_id, "fail", 0, $shmem_size) or die "Can't shmwrite: $!";
+	}
+
+	sleep(1);
+} else {
+	my $pg_bench_fork_flag;
+	shmread($shmem_id, $pg_bench_fork_flag, 0, $shmem_size) or die "Can't shmread: $!";
+
+	subtest 'reindex run subtest' => sub {
+		is($pg_bench_fork_flag, "wait", "pg_bench_fork_flag is correct");
+
+		my %psql = (stdin => '', stdout => '', stderr => '');
+		$psql{run} = IPC::Run::start(
+			[ 'psql', '-XA', '-f', '-', '-d', $node->connstr('postgres') ],
+			'<',
+			\$psql{stdin},
+			'>',
+			\$psql{stdout},
+			'2>',
+			\$psql{stderr},
+			$psql_timeout);
+
+		my ($result, $stdout, $stderr);
+		while (1)
+		{
+
+			($result, $stdout, $stderr) = $node->psql('postgres', q(REINDEX INDEX CONCURRENTLY idx;));
+			is($result, '0', 'REINDEX is correct');
+
+			($result, $stdout, $stderr) = $node->psql('postgres', q(SELECT bt_index_parent_check('idx', true, true);));
+			is($result, '0', 'bt_index_check is correct');
+ 			if ($result)
+ 			{
+				diag($stderr);
+ 			}
+
+			shmread($shmem_id, $pg_bench_fork_flag, 0, $shmem_size) or die "Can't shmread: $!";
+			last if $pg_bench_fork_flag ne "wait";
+		}
+
+		# explicitly shut down psql instances gracefully
+        $psql{stdin} .= "\\q\n";
+        $psql{run}->finish;
+
+		is($pg_bench_fork_flag, "done", "pg_bench_fork_flag is correct");
+	};
+
+
+	$child->finalize();
+	$child->summary();
+	$node->stop;
+	done_testing();
+}
diff --git a/src/include/catalog/index.h b/src/include/catalog/index.h
index 2dea96f47c..cac413e5eb 100644
--- a/src/include/catalog/index.h
+++ b/src/include/catalog/index.h
@@ -175,6 +175,11 @@ extern void RestoreReindexState(const void *reindexstate);
 
 extern void IndexSetParentIndex(Relation partitionIdx, Oid parentOid);
 
+extern void SafeICStateShmemInit(void);
+// TODO: bound by relation or database
+extern void UpdateNumSafeConcurrentlyBuiltIndexes(bool increment);
+extern bool IsAnySafeIndexBuildsConcurrently(void);
+
 
 /*
  * itemptr_encode - Encode ItemPointer as int64/int8
diff --git a/src/include/utils/rel.h b/src/include/utils/rel.h
index 8700204953..e3c7899203 100644
--- a/src/include/utils/rel.h
+++ b/src/include/utils/rel.h
@@ -152,6 +152,7 @@ typedef struct RelationData
 	List	   *rd_indexlist;	/* list of OIDs of indexes on relation */
 	Oid			rd_pkindex;		/* OID of (deferrable?) primary key, if any */
 	bool		rd_ispkdeferrable;	/* is rd_pkindex a deferrable PK? */
+	bool		rd_safeindexconcurrentlybuilding; /* is safe concurrent index building in progress for relation */
 	Oid			rd_replidindex; /* OID of replica identity index, if any */
 
 	/* data managed by RelationGetStatExtList: */
-- 
2.34.1

Reply via email to