On Fri, 2009-12-04 at 09:20 +0000, Simon Riggs wrote:
> On Tue, 2009-12-01 at 01:43 -0800, Jeff Davis wrote:
> > Marking as ready.
> 
> You're saying its ready, yet there are 3 additional suggestions patches
> attached here. Please can you resolve these and re-submit a single final
> patch from author and reviewer?

My apologies. At the time, I thought a couple days might matter, and the
changes are in areas that committers tend to editorialize anyway: docs
and a style issue. The only substantial patch was to vacuumdb.c.

Complete patch attached including my edits.

> * What happens if you issue VACUUM FULL; which we would expect to use
> the new method of vacuum on all tables in the database. Won't that just
> fail with an error when it comes to catalog tables? Sounds to me like we
> should avoid the error and just silently do an INPLACE on catalog
> tables.

That's how it works.

> * Such a pivotal change to Postgres needs more test coverage than a
> single line in regression tests. It might have been OK before, but I
> think we need a few more combinations here, at least in this release:
> with, without indexes, empty table, clustered, non-clustered etc and of
> course a full database VACUUM so that we have the catalog table case
> covered, plus an explicit catalog table vacuum.

It was my impression that the regression tests aren't meant to be
exhaustive, but merely exercise a good portion of the code to help
detect simple breakage. Also, pg_regress isn't good for detecting a lot
of the problems that vacuum might have (how do you even know whether the
vacuum happened in-place or not?).

We could put a VACUUM FULL; and a VACUUM (FULL INPLACE); somewhere,
which will cover a lot of the cases you're talking about. However, that
may be a performance consideration especially for people who develop on
laptops.

In general, though, I think the right place for this is a longer test
suite that is meant to be run less frequently.

Regards,
        Jeff Davis

*** a/doc/src/sgml/ref/vacuum.sgml
--- b/doc/src/sgml/ref/vacuum.sgml
***************
*** 21,27 **** PostgreSQL documentation
  
   <refsynopsisdiv>
  <synopsis>
! VACUUM [ ( { FULL | FREEZE | VERBOSE | ANALYZE } [, ...] ) ] [ <replaceable class="PARAMETER">table</replaceable> [ (<replaceable class="PARAMETER">column</replaceable> [, ...] ) ] ]
  VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ <replaceable class="PARAMETER">table</replaceable> ]
  VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] ANALYZE [ <replaceable class="PARAMETER">table</replaceable> [ (<replaceable class="PARAMETER">column</replaceable> [, ...] ) ] ]
  </synopsis>
--- 21,27 ----
  
   <refsynopsisdiv>
  <synopsis>
! VACUUM [ ( { FULL [ INPLACE ] | FREEZE | VERBOSE | ANALYZE } [, ...] ) ] [ <replaceable class="PARAMETER">table</replaceable> [ (<replaceable class="PARAMETER">column</replaceable> [, ...] ) ] ]
  VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] [ <replaceable class="PARAMETER">table</replaceable> ]
  VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] ANALYZE [ <replaceable class="PARAMETER">table</replaceable> [ (<replaceable class="PARAMETER">column</replaceable> [, ...] ) ] ]
  </synopsis>
***************
*** 86,91 **** VACUUM [ FULL ] [ FREEZE ] [ VERBOSE ] ANALYZE [ <replaceable class="PARAMETER">
--- 86,112 ----
        Selects <quote>full</quote> vacuum, which can reclaim more
        space, but takes much longer and exclusively locks the table.
       </para>
+      <para>
+       For user tables, all table data and indexes are rewritten. This
+       method requires extra disk space in which to write the new data,
+       and is generally useful when a significant amount of space needs
+       to be reclaimed from within the table.
+      </para>
+      <para>
+       For system tables, all table data and indexes are modified in
+       place to reclaim space. This method may require less disk space
+       for the table data than <command>VACUUM FULL</command> on a
+       comparable user table, but the indexes will grow which may
+       counteract that benefit. Additionally, the operation is often
+       slower than <command>VACUUM FULL</command> on a comparable user
+       table.
+      </para>
+      <para>
+       If <literal>FULL INPLACE</literal> is specified, the space is
+       reclaimed in the same manner as a system table, even if it is a
+       user table. Specifying <literal>INPLACE</literal> explicitly is
+       rarely useful.
+      </para>
      </listitem>
     </varlistentry>
  
*** a/doc/src/sgml/ref/vacuumdb.sgml
--- b/doc/src/sgml/ref/vacuumdb.sgml
***************
*** 24,29 **** PostgreSQL documentation
--- 24,30 ----
     <command>vacuumdb</command>
     <arg rep="repeat"><replaceable>connection-option</replaceable></arg>
     <group><arg>--full</arg><arg>-f</arg></group>
+    <group><arg>--inplace</arg><arg>-i</arg></group>
     <group><arg>--verbose</arg><arg>-v</arg></group>
     <group><arg>--analyze</arg><arg>-z</arg></group>
     <group><arg>--freeze</arg><arg>-F</arg></group>
***************
*** 36,41 **** PostgreSQL documentation
--- 37,43 ----
     <arg rep="repeat"><replaceable>connection-options</replaceable></arg>
     <group><arg>--all</arg><arg>-a</arg></group>
     <group><arg>--full</arg><arg>-f</arg></group>
+    <group><arg>--inplace</arg><arg>-i</arg></group>
     <group><arg>--verbose</arg><arg>-v</arg></group>
     <group><arg>--analyze</arg><arg>-z</arg></group>
     <group><arg>--freeze</arg><arg>-F</arg></group>
***************
*** 111,117 **** PostgreSQL documentation
        <term><option>--full</option></term>
        <listitem>
         <para>
!         Perform <quote>full</quote> vacuuming.
         </para>
        </listitem>
       </varlistentry>
--- 113,129 ----
        <term><option>--full</option></term>
        <listitem>
         <para>
!         Perform <quote>full replace</quote> vacuuming.
!        </para>
!       </listitem>
!      </varlistentry>
! 
!      <varlistentry>
!       <term><option>-i</option></term>
!       <term><option>--inplace</option></term>
!       <listitem>
!        <para>
!         Perform <quote>full inplace</quote> vacuuming.
         </para>
        </listitem>
       </varlistentry>
*** a/src/backend/commands/cluster.c
--- b/src/backend/commands/cluster.c
***************
*** 61,69 **** typedef struct
  } RelToCluster;
  
  
! static void cluster_rel(RelToCluster *rv, bool recheck, bool verbose);
! static void rebuild_relation(Relation OldHeap, Oid indexOid);
! static TransactionId copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex);
  static List *get_tables_to_cluster(MemoryContext cluster_context);
  
  
--- 61,70 ----
  } RelToCluster;
  
  
! static void rebuild_relation(Relation OldHeap, Oid indexOid,
! 							 int freeze_min_age, int freeze_table_age);
! static TransactionId copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap,
! 					Oid OIDOldIndex, int freeze_min_age, int freeze_table_age);
  static List *get_tables_to_cluster(MemoryContext cluster_context);
  
  
***************
*** 101,107 **** cluster(ClusterStmt *stmt, bool isTopLevel)
  		Oid			tableOid,
  					indexOid = InvalidOid;
  		Relation	rel;
- 		RelToCluster rvtc;
  
  		/* Find and lock the table */
  		rel = heap_openrv(stmt->relation, AccessExclusiveLock);
--- 102,107 ----
***************
*** 169,183 **** cluster(ClusterStmt *stmt, bool isTopLevel)
  							  stmt->indexname, stmt->relation->relname)));
  		}
  
- 		/* All other checks are done in cluster_rel() */
- 		rvtc.tableOid = tableOid;
- 		rvtc.indexOid = indexOid;
- 
  		/* close relation, keep lock till commit */
  		heap_close(rel, NoLock);
  
  		/* Do the job */
! 		cluster_rel(&rvtc, false, stmt->verbose);
  	}
  	else
  	{
--- 169,179 ----
  							  stmt->indexname, stmt->relation->relname)));
  		}
  
  		/* close relation, keep lock till commit */
  		heap_close(rel, NoLock);
  
  		/* Do the job */
! 		cluster_rel(tableOid, indexOid, false, stmt->verbose, -1, -1);
  	}
  	else
  	{
***************
*** 226,232 **** cluster(ClusterStmt *stmt, bool isTopLevel)
  			StartTransactionCommand();
  			/* functions in indexes may want a snapshot set */
  			PushActiveSnapshot(GetTransactionSnapshot());
! 			cluster_rel(rvtc, true, stmt->verbose);
  			PopActiveSnapshot();
  			CommitTransactionCommand();
  		}
--- 222,228 ----
  			StartTransactionCommand();
  			/* functions in indexes may want a snapshot set */
  			PushActiveSnapshot(GetTransactionSnapshot());
! 			cluster_rel(rvtc->tableOid, rvtc->indexOid, true, stmt->verbose, -1, -1);
  			PopActiveSnapshot();
  			CommitTransactionCommand();
  		}
***************
*** 253,260 **** cluster(ClusterStmt *stmt, bool isTopLevel)
   * the new table, it's better to create the indexes afterwards than to fill
   * them incrementally while we load the table.
   */
! static void
! cluster_rel(RelToCluster *rvtc, bool recheck, bool verbose)
  {
  	Relation	OldHeap;
  
--- 249,257 ----
   * the new table, it's better to create the indexes afterwards than to fill
   * them incrementally while we load the table.
   */
! void
! cluster_rel(Oid tableOid, Oid indexOid, bool recheck, bool verbose,
! 			int freeze_min_age, int freeze_table_age)
  {
  	Relation	OldHeap;
  
***************
*** 267,273 **** cluster_rel(RelToCluster *rvtc, bool recheck, bool verbose)
  	 * case, since cluster() already did it.)  The index lock is taken inside
  	 * check_index_is_clusterable.
  	 */
! 	OldHeap = try_relation_open(rvtc->tableOid, AccessExclusiveLock);
  
  	/* If the table has gone away, we can skip processing it */
  	if (!OldHeap)
--- 264,270 ----
  	 * case, since cluster() already did it.)  The index lock is taken inside
  	 * check_index_is_clusterable.
  	 */
! 	OldHeap = try_relation_open(tableOid, AccessExclusiveLock);
  
  	/* If the table has gone away, we can skip processing it */
  	if (!OldHeap)
***************
*** 287,293 **** cluster_rel(RelToCluster *rvtc, bool recheck, bool verbose)
  		Form_pg_index indexForm;
  
  		/* Check that the user still owns the relation */
! 		if (!pg_class_ownercheck(rvtc->tableOid, GetUserId()))
  		{
  			relation_close(OldHeap, AccessExclusiveLock);
  			return;
--- 284,290 ----
  		Form_pg_index indexForm;
  
  		/* Check that the user still owns the relation */
! 		if (!pg_class_ownercheck(tableOid, GetUserId()))
  		{
  			relation_close(OldHeap, AccessExclusiveLock);
  			return;
***************
*** 308,360 **** cluster_rel(RelToCluster *rvtc, bool recheck, bool verbose)
  			return;
  		}
  
! 		/*
! 		 * Check that the index still exists
! 		 */
! 		if (!SearchSysCacheExists(RELOID,
! 								  ObjectIdGetDatum(rvtc->indexOid),
! 								  0, 0, 0))
  		{
! 			relation_close(OldHeap, AccessExclusiveLock);
! 			return;
! 		}
  
! 		/*
! 		 * Check that the index is still the one with indisclustered set.
! 		 */
! 		tuple = SearchSysCache(INDEXRELID,
! 							   ObjectIdGetDatum(rvtc->indexOid),
! 							   0, 0, 0);
! 		if (!HeapTupleIsValid(tuple))	/* probably can't happen */
! 		{
! 			relation_close(OldHeap, AccessExclusiveLock);
! 			return;
! 		}
! 		indexForm = (Form_pg_index) GETSTRUCT(tuple);
! 		if (!indexForm->indisclustered)
! 		{
  			ReleaseSysCache(tuple);
- 			relation_close(OldHeap, AccessExclusiveLock);
- 			return;
  		}
- 		ReleaseSysCache(tuple);
  	}
  
! 	/* Check index is valid to cluster on */
! 	check_index_is_clusterable(OldHeap, rvtc->indexOid, recheck);
  
  	/* rebuild_relation does all the dirty work */
  	ereport(verbose ? INFO : DEBUG2,
  			(errmsg("clustering \"%s.%s\"",
  					get_namespace_name(RelationGetNamespace(OldHeap)),
  					RelationGetRelationName(OldHeap))));
! 	rebuild_relation(OldHeap, rvtc->indexOid);
  
  	/* NB: rebuild_relation does heap_close() on OldHeap */
  }
  
  /*
!  * Verify that the specified index is a legitimate index to cluster on
   *
   * Side effect: obtains exclusive lock on the index.  The caller should
   * already have exclusive lock on the table, so the index lock is likely
--- 305,360 ----
  			return;
  		}
  
! 		if (OidIsValid(indexOid))
  		{
! 			/*
! 			 * Check that the index still exists
! 			 */
! 			if (!SearchSysCacheExists(RELOID,
! 									  ObjectIdGetDatum(indexOid),
! 									  0, 0, 0))
! 			{
! 				relation_close(OldHeap, AccessExclusiveLock);
! 				return;
! 			}
  
! 			/*
! 			 * Check that the index is still the one with indisclustered set.
! 			 */
! 			tuple = SearchSysCache(INDEXRELID,
! 								   ObjectIdGetDatum(indexOid),
! 								   0, 0, 0);
! 			if (!HeapTupleIsValid(tuple))	/* probably can't happen */
! 			{
! 				relation_close(OldHeap, AccessExclusiveLock);
! 				return;
! 			}
! 			indexForm = (Form_pg_index) GETSTRUCT(tuple);
! 			if (!indexForm->indisclustered)
! 			{
! 				ReleaseSysCache(tuple);
! 				relation_close(OldHeap, AccessExclusiveLock);
! 				return;
! 			}
  			ReleaseSysCache(tuple);
  		}
  	}
  
! 	/* Check heap and index are valid to cluster on */
! 	check_index_is_clusterable(OldHeap, indexOid, recheck);
  
  	/* rebuild_relation does all the dirty work */
  	ereport(verbose ? INFO : DEBUG2,
  			(errmsg("clustering \"%s.%s\"",
  					get_namespace_name(RelationGetNamespace(OldHeap)),
  					RelationGetRelationName(OldHeap))));
! 	rebuild_relation(OldHeap, indexOid, freeze_min_age, freeze_table_age);
  
  	/* NB: rebuild_relation does heap_close() on OldHeap */
  }
  
  /*
!  * Verify that the specified heap and index are valid to cluster on
   *
   * Side effect: obtains exclusive lock on the index.  The caller should
   * already have exclusive lock on the table, so the index lock is likely
***************
*** 366,371 **** check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck)
--- 366,403 ----
  {
  	Relation	OldIndex;
  
+ 	/*
+ 	 * Disallow clustering system relations.  This will definitely NOT work
+ 	 * for shared relations (we have no way to update pg_class rows in other
+ 	 * databases), nor for nailed-in-cache relations (the relfilenode values
+ 	 * for those are hardwired, see relcache.c).  It might work for other
+ 	 * system relations, but I ain't gonna risk it.
+ 	 */
+ 	if (IsSystemRelation(OldHeap))
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 				 errmsg("\"%s\" is a system catalog",
+ 						RelationGetRelationName(OldHeap))));
+ 
+ 	/*
+ 	 * Don't allow cluster on temp tables of other backends ... their local
+ 	 * buffer manager is not going to cope.
+ 	 */
+ 	if (RELATION_IS_OTHER_TEMP(OldHeap))
+ 		ereport(ERROR,
+ 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ 			   errmsg("cannot cluster temporary tables of other sessions")));
+ 
+ 	/*
+ 	 * Also check for active uses of the relation in the current transaction,
+ 	 * including open scans and pending AFTER trigger events.
+ 	 */
+ 	CheckTableNotInUse(OldHeap, "CLUSTER");
+ 
+ 	/* Skip checks for index if not specified. */
+ 	if (!OidIsValid(indexOid))
+ 		return;
+ 
  	OldIndex = index_open(indexOid, AccessExclusiveLock);
  
  	/*
***************
*** 448,481 **** check_index_is_clusterable(Relation OldHeap, Oid indexOid, bool recheck)
  				 errmsg("cannot cluster on invalid index \"%s\"",
  						RelationGetRelationName(OldIndex))));
  
- 	/*
- 	 * Disallow clustering system relations.  This will definitely NOT work
- 	 * for shared relations (we have no way to update pg_class rows in other
- 	 * databases), nor for nailed-in-cache relations (the relfilenode values
- 	 * for those are hardwired, see relcache.c).  It might work for other
- 	 * system relations, but I ain't gonna risk it.
- 	 */
- 	if (IsSystemRelation(OldHeap))
- 		ereport(ERROR,
- 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- 				 errmsg("\"%s\" is a system catalog",
- 						RelationGetRelationName(OldHeap))));
- 
- 	/*
- 	 * Don't allow cluster on temp tables of other backends ... their local
- 	 * buffer manager is not going to cope.
- 	 */
- 	if (RELATION_IS_OTHER_TEMP(OldHeap))
- 		ereport(ERROR,
- 				(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- 			   errmsg("cannot cluster temporary tables of other sessions")));
- 
- 	/*
- 	 * Also check for active uses of the relation in the current transaction,
- 	 * including open scans and pending AFTER trigger events.
- 	 */
- 	CheckTableNotInUse(OldHeap, "CLUSTER");
- 
  	/* Drop relcache refcnt on OldIndex, but keep lock */
  	index_close(OldIndex, NoLock);
  }
--- 480,485 ----
***************
*** 565,571 **** mark_index_clustered(Relation rel, Oid indexOid)
   * NB: this routine closes OldHeap at the right time; caller should not.
   */
  static void
! rebuild_relation(Relation OldHeap, Oid indexOid)
  {
  	Oid			tableOid = RelationGetRelid(OldHeap);
  	Oid			tableSpace = OldHeap->rd_rel->reltablespace;
--- 569,576 ----
   * NB: this routine closes OldHeap at the right time; caller should not.
   */
  static void
! rebuild_relation(Relation OldHeap, Oid indexOid,
! 				 int freeze_min_age, int freeze_table_age)
  {
  	Oid			tableOid = RelationGetRelid(OldHeap);
  	Oid			tableSpace = OldHeap->rd_rel->reltablespace;
***************
*** 576,582 **** rebuild_relation(Relation OldHeap, Oid indexOid)
  	Relation	newrel;
  
  	/* Mark the correct index as clustered */
! 	mark_index_clustered(OldHeap, indexOid);
  
  	/* Close relcache entry, but keep lock until transaction commit */
  	heap_close(OldHeap, NoLock);
--- 581,588 ----
  	Relation	newrel;
  
  	/* Mark the correct index as clustered */
! 	if (OidIsValid(indexOid))
! 		mark_index_clustered(OldHeap, indexOid);
  
  	/* Close relcache entry, but keep lock until transaction commit */
  	heap_close(OldHeap, NoLock);
***************
*** 599,605 **** rebuild_relation(Relation OldHeap, Oid indexOid)
  	/*
  	 * Copy the heap data into the new table in the desired order.
  	 */
! 	frozenXid = copy_heap_data(OIDNewHeap, tableOid, indexOid);
  
  	/* To make the new heap's data visible (probably not needed?). */
  	CommandCounterIncrement();
--- 605,612 ----
  	/*
  	 * Copy the heap data into the new table in the desired order.
  	 */
! 	frozenXid = copy_heap_data(OIDNewHeap, tableOid, indexOid,
! 							   freeze_min_age, freeze_table_age);
  
  	/* To make the new heap's data visible (probably not needed?). */
  	CommandCounterIncrement();
***************
*** 758,764 **** make_new_heap(Oid OIDOldHeap, const char *NewName, Oid NewTableSpace)
   * freeze cutoff point for the tuples.
   */
  static TransactionId
! copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  {
  	Relation	NewHeap,
  				OldHeap,
--- 765,772 ----
   * freeze cutoff point for the tuples.
   */
  static TransactionId
! copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex,
! 			   int freeze_min_age, int freeze_table_age)
  {
  	Relation	NewHeap,
  				OldHeap,
***************
*** 768,775 **** copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  	int			natts;
  	Datum	   *values;
  	bool	   *isnull;
! 	IndexScanDesc scan;
! 	HeapTuple	tuple;
  	bool		use_wal;
  	TransactionId OldestXmin;
  	TransactionId FreezeXid;
--- 776,783 ----
  	int			natts;
  	Datum	   *values;
  	bool	   *isnull;
! 	IndexScanDesc indexScan;
! 	HeapScanDesc heapScan;
  	bool		use_wal;
  	TransactionId OldestXmin;
  	TransactionId FreezeXid;
***************
*** 780,786 **** copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  	 */
  	NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
  	OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
! 	OldIndex = index_open(OIDOldIndex, AccessExclusiveLock);
  
  	/*
  	 * Their tuple descriptors should be exactly alike, but here we only need
--- 788,797 ----
  	 */
  	NewHeap = heap_open(OIDNewHeap, AccessExclusiveLock);
  	OldHeap = heap_open(OIDOldHeap, AccessExclusiveLock);
! 	if (OidIsValid(OIDOldIndex))
! 		OldIndex = index_open(OIDOldIndex, AccessExclusiveLock);
! 	else
! 		OldIndex = NULL;
  
  	/*
  	 * Their tuple descriptors should be exactly alike, but here we only need
***************
*** 809,816 **** copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  	 * freeze_min_age to avoid having CLUSTER freeze tuples earlier than a
  	 * plain VACUUM would.
  	 */
! 	vacuum_set_xid_limits(-1, -1, OldHeap->rd_rel->relisshared,
! 						  &OldestXmin, &FreezeXid, NULL);
  
  	/*
  	 * FreezeXid will become the table's new relfrozenxid, and that mustn't go
--- 820,827 ----
  	 * freeze_min_age to avoid having CLUSTER freeze tuples earlier than a
  	 * plain VACUUM would.
  	 */
! 	vacuum_set_xid_limits(freeze_min_age, freeze_table_age,
! 				OldHeap->rd_rel->relisshared, &OldestXmin, &FreezeXid, NULL);
  
  	/*
  	 * FreezeXid will become the table's new relfrozenxid, and that mustn't go
***************
*** 828,852 **** copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  	 * copied, we scan with SnapshotAny and use HeapTupleSatisfiesVacuum for
  	 * the visibility test.
  	 */
! 	scan = index_beginscan(OldHeap, OldIndex,
  						   SnapshotAny, 0, (ScanKey) NULL);
  
! 	while ((tuple = index_getnext(scan, ForwardScanDirection)) != NULL)
  	{
  		HeapTuple	copiedTuple;
  		bool		isdead;
  		int			i;
  
  		CHECK_FOR_INTERRUPTS();
  
! 		/* Since we used no scan keys, should never need to recheck */
! 		if (scan->xs_recheck)
! 			elog(ERROR, "CLUSTER does not support lossy index conditions");
  
! 		LockBuffer(scan->xs_cbuf, BUFFER_LOCK_SHARE);
  
! 		switch (HeapTupleSatisfiesVacuum(tuple->t_data, OldestXmin,
! 										 scan->xs_cbuf))
  		{
  			case HEAPTUPLE_DEAD:
  				/* Definitely dead */
--- 839,884 ----
  	 * copied, we scan with SnapshotAny and use HeapTupleSatisfiesVacuum for
  	 * the visibility test.
  	 */
! 	if (OldIndex != NULL)
! 		indexScan = index_beginscan(OldHeap, OldIndex,
  						   SnapshotAny, 0, (ScanKey) NULL);
+ 	else
+ 		heapScan = heap_beginscan(OldHeap, SnapshotAny, 0, (ScanKey) NULL);
  
! 	for (;;)
  	{
+ 		HeapTuple	tuple;
  		HeapTuple	copiedTuple;
+ 		Buffer		buf;
  		bool		isdead;
  		int			i;
  
  		CHECK_FOR_INTERRUPTS();
  
! 		if (OldIndex != NULL)
! 		{
! 			tuple = index_getnext(indexScan, ForwardScanDirection);
! 			if (tuple == NULL)
! 				break;
  
! 			/* Since we used no scan keys, should never need to recheck */
! 			if (indexScan->xs_recheck)
! 				elog(ERROR, "CLUSTER does not support lossy index conditions");
  
! 			buf = indexScan->xs_cbuf;
! 		}
! 		else
! 		{
! 			tuple = heap_getnext(heapScan, ForwardScanDirection);
! 			if (tuple == NULL)
! 				break;
! 
! 			buf = heapScan->rs_cbuf;
! 		}
! 
! 		LockBuffer(buf, BUFFER_LOCK_SHARE);
! 
! 		switch (HeapTupleSatisfiesVacuum(tuple->t_data, OldestXmin, buf))
  		{
  			case HEAPTUPLE_DEAD:
  				/* Definitely dead */
***************
*** 888,894 **** copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  				break;
  		}
  
! 		LockBuffer(scan->xs_cbuf, BUFFER_LOCK_UNLOCK);
  
  		if (isdead)
  		{
--- 920,926 ----
  				break;
  		}
  
! 		LockBuffer(buf, BUFFER_LOCK_UNLOCK);
  
  		if (isdead)
  		{
***************
*** 932,938 **** copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  		heap_freetuple(copiedTuple);
  	}
  
! 	index_endscan(scan);
  
  	/* Write out any remaining tuples, and fsync if needed */
  	end_heap_rewrite(rwstate);
--- 964,973 ----
  		heap_freetuple(copiedTuple);
  	}
  
! 	if (OldIndex != NULL)
! 		index_endscan(indexScan);
! 	else
! 		heap_endscan(heapScan);
  
  	/* Write out any remaining tuples, and fsync if needed */
  	end_heap_rewrite(rwstate);
***************
*** 940,946 **** copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex)
  	pfree(values);
  	pfree(isnull);
  
! 	index_close(OldIndex, NoLock);
  	heap_close(OldHeap, NoLock);
  	heap_close(NewHeap, NoLock);
  
--- 975,982 ----
  	pfree(values);
  	pfree(isnull);
  
! 	if (OldIndex != NULL)
! 		index_close(OldIndex, NoLock);
  	heap_close(OldHeap, NoLock);
  	heap_close(NewHeap, NoLock);
  
*** a/src/backend/commands/vacuum.c
--- b/src/backend/commands/vacuum.c
***************
*** 29,38 ****
--- 29,40 ----
  #include "access/visibilitymap.h"
  #include "access/xact.h"
  #include "access/xlog.h"
+ #include "catalog/catalog.h"
  #include "catalog/namespace.h"
  #include "catalog/pg_database.h"
  #include "catalog/pg_namespace.h"
  #include "catalog/storage.h"
+ #include "commands/cluster.h"
  #include "commands/dbcommands.h"
  #include "commands/vacuum.h"
  #include "executor/executor.h"
***************
*** 301,306 **** vacuum(VacuumStmt *vacstmt, Oid relid, bool do_toast,
--- 303,310 ----
  	Assert((vacstmt->options & VACOPT_VACUUM) ||
  		   !(vacstmt->options & (VACOPT_FULL | VACOPT_FREEZE)));
  	Assert((vacstmt->options & VACOPT_ANALYZE) || vacstmt->va_cols == NIL);
+ 	Assert((vacstmt->options & VACOPT_FULL) ||
+ 		   !(vacstmt->options & VACOPT_INPLACE));
  
  	stmttype = (vacstmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
  
***************
*** 1192,1209 **** vacuum_rel(Oid relid, VacuumStmt *vacstmt, bool do_toast, bool for_wraparound,
  	SetUserIdAndContext(onerel->rd_rel->relowner, true);
  
  	/*
! 	 * Do the actual work --- either FULL or "lazy" vacuum
  	 */
! 	if (vacstmt->options & VACOPT_FULL)
  		heldoff = full_vacuum_rel(onerel, vacstmt);
  	else
! 		heldoff = lazy_vacuum_rel(onerel, vacstmt, vac_strategy, scanned_all);
  
  	/* Restore userid */
  	SetUserIdAndContext(save_userid, save_secdefcxt);
  
  	/* all done with this class, but hold lock until commit */
! 	relation_close(onerel, NoLock);
  
  	/*
  	 * Complete the transaction and free all temporary memory used.
--- 1196,1226 ----
  	SetUserIdAndContext(onerel->rd_rel->relowner, true);
  
  	/*
! 	 * Do the actual work --- either FULL, FULL INPLACE, or "lazy" vacuum.
! 	 * We can only use FULL INPLACE vacuum for system relations.
  	 */
! 	if (!(vacstmt->options & VACOPT_FULL))
! 		heldoff = lazy_vacuum_rel(onerel, vacstmt, vac_strategy, scanned_all);
! 	else if ((vacstmt->options & VACOPT_INPLACE) || IsSystemRelation(onerel))
  		heldoff = full_vacuum_rel(onerel, vacstmt);
  	else
! 	{
! 		/* close relation before clustering, but hold lock until commit */
! 		relation_close(onerel, NoLock);
! 		onerel = NULL;
! 
! 		cluster_rel(relid, InvalidOid, false,
! 			(vacstmt->options & VACOPT_VERBOSE) != 0,
! 			vacstmt->freeze_min_age, vacstmt->freeze_table_age);
! 		heldoff = false;
! 	}
  
  	/* Restore userid */
  	SetUserIdAndContext(save_userid, save_secdefcxt);
  
  	/* all done with this class, but hold lock until commit */
! 	if (onerel)
! 		relation_close(onerel, NoLock);
  
  	/*
  	 * Complete the transaction and free all temporary memory used.
*** a/src/backend/parser/gram.y
--- b/src/backend/parser/gram.y
***************
*** 486,492 **** static TypeName *TableFuncTypeName(List *columns);
  
  	IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IN_P
  	INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
! 	INNER_P INOUT INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
  	INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
  
  	JOIN
--- 486,492 ----
  
  	IDENTITY_P IF_P ILIKE IMMEDIATE IMMUTABLE IMPLICIT_P IN_P
  	INCLUDING INCREMENT INDEX INDEXES INHERIT INHERITS INITIALLY INLINE_P
! 	INNER_P INOUT INPLACE INPUT_P INSENSITIVE INSERT INSTEAD INT_P INTEGER
  	INTERSECT INTERVAL INTO INVOKER IS ISNULL ISOLATION
  
  	JOIN
***************
*** 6708,6713 **** vacuum_option_elem:
--- 6708,6714 ----
  			| VERBOSE			{ $$ = VACOPT_VERBOSE; }
  			| FREEZE			{ $$ = VACOPT_FREEZE; }
  			| FULL				{ $$ = VACOPT_FULL; }
+ 			| FULL INPLACE		{ $$ = VACOPT_FULL | VACOPT_INPLACE; }
  		;
  
  AnalyzeStmt:
***************
*** 10649,10654 **** unreserved_keyword:
--- 10650,10656 ----
  			| INHERIT
  			| INHERITS
  			| INLINE_P
+ 			| INPLACE
  			| INPUT_P
  			| INSENSITIVE
  			| INSERT
*** a/src/bin/scripts/vacuumdb.c
--- b/src/bin/scripts/vacuumdb.c
***************
*** 14,28 ****
  #include "common.h"
  
  
! static void vacuum_one_database(const char *dbname, bool full, bool verbose, bool analyze,
! 					bool freeze, const char *table,
  					const char *host, const char *port,
  					const char *username, enum trivalue prompt_password,
  					const char *progname, bool echo);
! static void vacuum_all_databases(bool full, bool verbose, bool analyze, bool freeze,
! 					 const char *host, const char *port,
! 					 const char *username, enum trivalue prompt_password,
! 					 const char *progname, bool echo, bool quiet);
  
  static void help(const char *progname);
  
--- 14,29 ----
  #include "common.h"
  
  
! static void vacuum_one_database(const char *dbname, bool full, bool inplace,
! 					bool verbose, bool analyze, bool freeze, const char *table,
  					const char *host, const char *port,
  					const char *username, enum trivalue prompt_password,
  					const char *progname, bool echo);
! static void vacuum_all_databases(bool full, bool inplace,
! 					bool verbose, bool analyze, bool freeze,
! 					const char *host, const char *port,
! 					const char *username, enum trivalue prompt_password,
! 					const char *progname, bool echo, bool quiet);
  
  static void help(const char *progname);
  
***************
*** 45,50 **** main(int argc, char *argv[])
--- 46,52 ----
  		{"table", required_argument, NULL, 't'},
  		{"full", no_argument, NULL, 'f'},
  		{"verbose", no_argument, NULL, 'v'},
+ 		{"inplace", no_argument, NULL, 'i'},
  		{NULL, 0, NULL, 0}
  	};
  
***************
*** 65,77 **** main(int argc, char *argv[])
  	char	   *table = NULL;
  	bool		full = false;
  	bool		verbose = false;
  
  	progname = get_progname(argv[0]);
  	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
  
  	handle_help_version_opts(argc, argv, "vacuumdb", help);
  
! 	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zaFt:fv", long_options, &optindex)) != -1)
  	{
  		switch (c)
  		{
--- 67,80 ----
  	char	   *table = NULL;
  	bool		full = false;
  	bool		verbose = false;
+ 	bool		inplace = false;
  
  	progname = get_progname(argv[0]);
  	set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pgscripts"));
  
  	handle_help_version_opts(argc, argv, "vacuumdb", help);
  
! 	while ((c = getopt_long(argc, argv, "h:p:U:wWeqd:zaFt:fiv", long_options, &optindex)) != -1)
  	{
  		switch (c)
  		{
***************
*** 117,122 **** main(int argc, char *argv[])
--- 120,128 ----
  			case 'v':
  				verbose = true;
  				break;
+ 			case 'i':
+ 				inplace = true;
+ 				break;
  			default:
  				fprintf(stderr, _("Try \"%s --help\" for more information.\n"), progname);
  				exit(1);
***************
*** 137,142 **** main(int argc, char *argv[])
--- 143,156 ----
  			exit(1);
  	}
  
+ 	/* INPLACE option requires FULL option. */
+ 	if (inplace && !full)
+ 	{
+ 		fprintf(stderr, _("%s: -i, --inplace option requires -f, --full\n"),
+ 				progname);
+ 		exit(1);
+ 	}
+ 
  	setup_cancel_handler();
  
  	if (alldb)
***************
*** 154,160 **** main(int argc, char *argv[])
  			exit(1);
  		}
  
! 		vacuum_all_databases(full, verbose, analyze, freeze,
  							 host, port, username, prompt_password,
  							 progname, echo, quiet);
  	}
--- 168,174 ----
  			exit(1);
  		}
  
! 		vacuum_all_databases(full, inplace, verbose, analyze, freeze,
  							 host, port, username, prompt_password,
  							 progname, echo, quiet);
  	}
***************
*** 170,177 **** main(int argc, char *argv[])
  				dbname = get_user_name(progname);
  		}
  
! 		vacuum_one_database(dbname, full, verbose, analyze, freeze, table,
! 							host, port, username, prompt_password,
  							progname, echo);
  	}
  
--- 184,191 ----
  				dbname = get_user_name(progname);
  		}
  
! 		vacuum_one_database(dbname, full, inplace, verbose, analyze, freeze,
! 							table, host, port, username, prompt_password,
  							progname, echo);
  	}
  
***************
*** 180,187 **** main(int argc, char *argv[])
  
  
  static void
! vacuum_one_database(const char *dbname, bool full, bool verbose, bool analyze,
! 					bool freeze, const char *table,
  					const char *host, const char *port,
  					const char *username, enum trivalue prompt_password,
  					const char *progname, bool echo)
--- 194,201 ----
  
  
  static void
! vacuum_one_database(const char *dbname, bool full, bool inplace, bool verbose,
! 					bool analyze, bool freeze, const char *table,
  					const char *host, const char *port,
  					const char *username, enum trivalue prompt_password,
  					const char *progname, bool echo)
***************
*** 189,211 **** vacuum_one_database(const char *dbname, bool full, bool verbose, bool analyze,
  	PQExpBufferData sql;
  
  	PGconn	   *conn;
  
  	initPQExpBuffer(&sql);
  
  	appendPQExpBuffer(&sql, "VACUUM");
! 	if (full)
! 		appendPQExpBuffer(&sql, " FULL");
! 	if (freeze)
! 		appendPQExpBuffer(&sql, " FREEZE");
! 	if (verbose)
! 		appendPQExpBuffer(&sql, " VERBOSE");
! 	if (analyze)
! 		appendPQExpBuffer(&sql, " ANALYZE");
  	if (table)
  		appendPQExpBuffer(&sql, " %s", table);
  	appendPQExpBuffer(&sql, ";\n");
  
- 	conn = connectDatabase(dbname, host, port, username, prompt_password, progname);
  	if (!executeMaintenanceCommand(conn, sql.data, echo))
  	{
  		if (table)
--- 203,264 ----
  	PQExpBufferData sql;
  
  	PGconn	   *conn;
+ 	int			version;
+ 	bool		first_opt = true;
  
  	initPQExpBuffer(&sql);
  
+ 	conn = connectDatabase(dbname, host, port, username, prompt_password, progname);
+ 	version = PQserverVersion(conn);
+ 
  	appendPQExpBuffer(&sql, "VACUUM");
! 
! 	if (version >= 80500)
! 	{
! 		if (full)
! 		{
! 			appendPQExpBuffer(&sql, "%sFULL%s", first_opt ? " (" : ", ",
! 							  inplace ? " INPLACE" : "");
! 			first_opt = false;
! 		}
! 		if (freeze)
! 		{
! 			appendPQExpBuffer(&sql, "%sFREEZE", first_opt ? " (" : ", ");
! 			first_opt = false;
! 		}
! 		if (verbose)
! 		{
! 			appendPQExpBuffer(&sql, "%sVERBOSE", first_opt ? " (" : ", ");
! 			first_opt = false;
! 		}
! 		if (analyze)
! 		{
! 			appendPQExpBuffer(&sql, "%sANALYZE", first_opt ? " (" : ", ");
! 			first_opt = false;
! 		}
! 		if (!first_opt)
! 			appendPQExpBuffer(&sql, ")");
! 	}
! 	else
! 	{
! 		/*
! 		 * On older servers, VACUUM FULL is equivalent to VACUUM (FULL
! 		 * INPLACE) on newer servers, so we can ignore 'inplace'.
! 		 */
! 		if (full)
! 			appendPQExpBuffer(&sql, " FULL");
! 		if (freeze)
! 			appendPQExpBuffer(&sql, " FREEZE");
! 		if (verbose)
! 			appendPQExpBuffer(&sql, " VERBOSE");
! 		if (analyze)
! 			appendPQExpBuffer(&sql, " ANALYZE");
! 	}
! 
  	if (table)
  		appendPQExpBuffer(&sql, " %s", table);
  	appendPQExpBuffer(&sql, ";\n");
  
  	if (!executeMaintenanceCommand(conn, sql.data, echo))
  	{
  		if (table)
***************
*** 223,230 **** vacuum_one_database(const char *dbname, bool full, bool verbose, bool analyze,
  
  
  static void
! vacuum_all_databases(bool full, bool verbose, bool analyze, bool freeze,
! 					 const char *host, const char *port,
  					 const char *username, enum trivalue prompt_password,
  					 const char *progname, bool echo, bool quiet)
  {
--- 276,283 ----
  
  
  static void
! vacuum_all_databases(bool full, bool inplace, bool verbose, bool analyze,
! 					 bool freeze, const char *host, const char *port,
  					 const char *username, enum trivalue prompt_password,
  					 const char *progname, bool echo, bool quiet)
  {
***************
*** 246,253 **** vacuum_all_databases(bool full, bool verbose, bool analyze, bool freeze,
  			fflush(stdout);
  		}
  
! 		vacuum_one_database(dbname, full, verbose, analyze, freeze, NULL,
! 							host, port, username, prompt_password,
  							progname, echo);
  	}
  
--- 299,306 ----
  			fflush(stdout);
  		}
  
! 		vacuum_one_database(dbname, full, inplace, verbose, analyze, freeze,
! 							NULL, host, port, username, prompt_password,
  							progname, echo);
  	}
  
***************
*** 267,272 **** help(const char *progname)
--- 320,326 ----
  	printf(_("  -e, --echo                      show the commands being sent to the server\n"));
  	printf(_("  -f, --full                      do full vacuuming\n"));
  	printf(_("  -F, --freeze                    freeze row transaction information\n"));
+ 	printf(_("  -i, --inplace                   do full inplace vacuuming\n"));
  	printf(_("  -q, --quiet                     don't write any messages\n"));
  	printf(_("  -t, --table='TABLE[(COLUMNS)]'  vacuum specific table only\n"));
  	printf(_("  -v, --verbose                   write a lot of output\n"));
*** a/src/include/commands/cluster.h
--- b/src/include/commands/cluster.h
***************
*** 18,24 ****
  
  
  extern void cluster(ClusterStmt *stmt, bool isTopLevel);
! 
  extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
  						   bool recheck);
  extern void mark_index_clustered(Relation rel, Oid indexOid);
--- 18,25 ----
  
  
  extern void cluster(ClusterStmt *stmt, bool isTopLevel);
! extern void cluster_rel(Oid tableOid, Oid indexOid, bool recheck,
! 					bool verbose, int freeze_min_age, int freeze_table_age);
  extern void check_index_is_clusterable(Relation OldHeap, Oid indexOid,
  						   bool recheck);
  extern void mark_index_clustered(Relation rel, Oid indexOid);
*** a/src/include/nodes/parsenodes.h
--- b/src/include/nodes/parsenodes.h
***************
*** 2222,2228 **** typedef enum VacuumOption
  	VACOPT_ANALYZE		= 1 << 1,	/* do ANALYZE */
  	VACOPT_VERBOSE		= 1 << 2,	/* print progress info */
  	VACOPT_FREEZE		= 1 << 3,	/* FREEZE option */
! 	VACOPT_FULL			= 1 << 4	/* FULL (non-concurrent) vacuum */
  } VacuumOption;
  
  typedef struct VacuumStmt
--- 2222,2229 ----
  	VACOPT_ANALYZE		= 1 << 1,	/* do ANALYZE */
  	VACOPT_VERBOSE		= 1 << 2,	/* print progress info */
  	VACOPT_FREEZE		= 1 << 3,	/* FREEZE option */
! 	VACOPT_FULL			= 1 << 4,	/* FULL (non-concurrent) vacuum */
! 	VACOPT_INPLACE		= 1 << 5	/* traditional FULL INPLACE vacuum */
  } VacuumOption;
  
  typedef struct VacuumStmt
*** a/src/include/parser/kwlist.h
--- b/src/include/parser/kwlist.h
***************
*** 192,197 **** PG_KEYWORD("initially", INITIALLY, RESERVED_KEYWORD)
--- 192,198 ----
  PG_KEYWORD("inline", INLINE_P, UNRESERVED_KEYWORD)
  PG_KEYWORD("inner", INNER_P, TYPE_FUNC_NAME_KEYWORD)
  PG_KEYWORD("inout", INOUT, COL_NAME_KEYWORD)
+ PG_KEYWORD("inplace", INPLACE, UNRESERVED_KEYWORD)
  PG_KEYWORD("input", INPUT_P, UNRESERVED_KEYWORD)
  PG_KEYWORD("insensitive", INSENSITIVE, UNRESERVED_KEYWORD)
  PG_KEYWORD("insert", INSERT, UNRESERVED_KEYWORD)
*** a/src/test/regress/expected/vacuum.out
--- b/src/test/regress/expected/vacuum.out
***************
*** 57,60 **** SELECT * FROM vactst;
--- 57,61 ----
  (0 rows)
  
  VACUUM (FULL, FREEZE) vactst;
+ VACUUM (ANALYZE, FULL INPLACE) vactst;
  DROP TABLE vactst;
*** a/src/test/regress/sql/vacuum.sql
--- b/src/test/regress/sql/vacuum.sql
***************
*** 40,44 **** DELETE FROM vactst;
--- 40,45 ----
  SELECT * FROM vactst;
  
  VACUUM (FULL, FREEZE) vactst;
+ VACUUM (ANALYZE, FULL INPLACE) vactst;
  
  DROP TABLE vactst;
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to