On Tue, 2008-10-07 at 07:21 +0100, Simon Riggs wrote:

> It was an excellent question because that aspect isn't handled correctly
> in the enclosed patch for subcommands, other than index-creating
> constraints. 
> 
> My main focus is on these commands
> * CREATE TRIGGER
> * ALTER TABLE ..  ADD PRIMARY KEY
> * ALTER TABLE ..  ADD FOREIGN KEY
 * CREATE RULE

> because those are the most painful ones. We could make it work against
> more, but we'd need to rewrite lots and lots of catalog update code.

OK, patch updated, tested, working.

Points of note.

1. I've left the infrastructure there for further changes, but we can
rip that out if desired.

2. Also need to decide whether we want pg_class.reltriggers as int2 (as
implemented here) or switch to relhastriggers as boolean.

3. The patch introduces a slight weirdness: if you create two FKs on the
same column at the same time you end up with two constraints with
identical names. Drop constraint then removes them both, though in other
respects they are both valid, just not uniquely. CREATE INDEX avoids
this by way of the unique index on relname. The equivalent index on
pg_constraint is not unique, though *cannot* be made unique without
breaking some corner cases of table inheritance.

-- 
 Simon Riggs           www.2ndQuadrant.com
 PostgreSQL Training, Services and Support
Index: doc/src/sgml/mvcc.sgml
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/doc/src/sgml/mvcc.sgml,v
retrieving revision 2.69
diff -c -r2.69 mvcc.sgml
*** doc/src/sgml/mvcc.sgml	18 Feb 2007 01:21:49 -0000	2.69
--- doc/src/sgml/mvcc.sgml	7 Oct 2008 11:27:21 -0000
***************
*** 504,512 ****
      most <productname>PostgreSQL</productname> commands automatically
      acquire locks of appropriate modes to ensure that referenced
      tables are not dropped or modified in incompatible ways while the
!     command executes.  (For example, <command>ALTER TABLE</> cannot safely be
!     executed concurrently with other operations on the same table, so it
!     obtains an exclusive lock on the table to enforce that.)
     </para>
  
     <para>
--- 504,512 ----
      most <productname>PostgreSQL</productname> commands automatically
      acquire locks of appropriate modes to ensure that referenced
      tables are not dropped or modified in incompatible ways while the
!     command executes.  (For example, <command>TRUNCATE</> 
!     cannot safely be executed concurrently with other operations on the 
!     same table, so it obtains an exclusive lock on the table to enforce that.)
     </para>
  
     <para>
***************
*** 647,654 ****
          </para>
  
          <para>
!          Acquired by <command>CREATE INDEX</command>
!          (without <option>CONCURRENTLY</option>).
          </para>
         </listitem>
        </varlistentry>
--- 647,659 ----
          </para>
  
          <para>
!          Acquired by <command>CREATE INDEX</command> (without 
!          <option>CONCURRENTLY</option>), <command>CREATE TRIGGER</command>, 
!          <command>CREATE RULE</command> (all except ON SELECT rules),
!          <command>ALTER TABLE</command> (if all of its subcommands are one 
!          of the following subcommands: <command>ADD UNIQUE</command>, 
!          <command>ADD PRIMARY KEY</command> or 
!          <command>ADD FOREIGN KEY</command>).
          </para>
         </listitem>
        </varlistentry>
***************
*** 714,724 ****
          </para>
  
          <para>
!          Acquired by the <command>ALTER TABLE</command>, <command>DROP
!          TABLE</command>, <command>TRUNCATE</command>, <command>REINDEX</command>,
           <command>CLUSTER</command>, and <command>VACUUM FULL</command>
           commands.  This is also the default lock mode for <command>LOCK
           TABLE</command> statements that do not specify a mode explicitly.
          </para>
         </listitem>
        </varlistentry>
--- 719,731 ----
          </para>
  
          <para>
!          Acquired by the <command>DROP TABLE</command>, 
!          <command>TRUNCATE</command>, <command>REINDEX</command>,
           <command>CLUSTER</command>, and <command>VACUUM FULL</command>
           commands.  This is also the default lock mode for <command>LOCK
           TABLE</command> statements that do not specify a mode explicitly.
+          <command>ALTER TABLE</command> will take a lock at this level, 
+          with some exceptions, noted above under SHARE LOCK.
          </para>
         </listitem>
        </varlistentry>
Index: src/backend/commands/tablecmds.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/commands/tablecmds.c,v
retrieving revision 1.266
diff -c -r1.266 tablecmds.c
*** src/backend/commands/tablecmds.c	8 Sep 2008 00:47:40 -0000	1.266
--- src/backend/commands/tablecmds.c	7 Oct 2008 10:14:53 -0000
***************
*** 249,269 ****
  							 Relation rel, Relation pkrel, Oid constraintOid);
  static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
  						 Oid constraintOid);
! static void ATController(Relation rel, List *cmds, bool recurse);
  static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing);
! static void ATRewriteCatalogs(List **wqueue);
  static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					  AlterTableCmd *cmd);
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
  static void ATSimplePermissions(Relation rel, bool allowView);
  static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd);
  static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
  				AlterTableCmd *cmd);
  static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
--- 249,269 ----
  							 Relation rel, Relation pkrel, Oid constraintOid);
  static void createForeignKeyTriggers(Relation rel, FkConstraint *fkconstraint,
  						 Oid constraintOid);
! static void ATController(Relation rel, List *cmds, bool recurse, bool needFullLock);
  static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing, bool needFullLock);
! static void ATRewriteCatalogs(List **wqueue, bool needFullLock);
  static void ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					  AlterTableCmd *cmd, bool needFullLock);
  static void ATRewriteTables(List **wqueue);
  static void ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap);
  static AlteredTableInfo *ATGetQueueEntry(List **wqueue, Relation rel);
  static void ATSimplePermissions(Relation rel, bool allowView);
  static void ATSimplePermissionsRelationOrIndex(Relation rel);
  static void ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse, bool needFullLock);
  static void ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd, bool needFullLock);
  static void ATPrepAddColumn(List **wqueue, Relation rel, bool recurse,
  				AlterTableCmd *cmd);
  static void ATExecAddColumn(AlteredTableInfo *tab, Relation rel,
***************
*** 284,297 ****
  				 DropBehavior behavior,
  				 bool recurse, bool recursing);
  static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild);
  static void ATExecAddConstraint(List **wqueue,
  								AlteredTableInfo *tab, Relation rel,
! 								Node *newConstraint, bool recurse);
  static void ATAddCheckConstraint(List **wqueue,
  								 AlteredTableInfo *tab, Relation rel,
  								 Constraint *constr,
! 								 bool recurse, bool recursing);
  static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  						  FkConstraint *fkconstraint);
  static void ATExecDropConstraint(Relation rel, const char *constrName,
--- 284,299 ----
  				 DropBehavior behavior,
  				 bool recurse, bool recursing);
  static void ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild, bool needFullLock);
  static void ATExecAddConstraint(List **wqueue,
  								AlteredTableInfo *tab, Relation rel,
! 								Node *newConstraint, bool recurse, 
! 								bool needFullLock);
  static void ATAddCheckConstraint(List **wqueue,
  								 AlteredTableInfo *tab, Relation rel,
  								 Constraint *constr,
! 								 bool recurse, bool recursing, 
! 								 bool needFullLock);
  static void ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
  						  FkConstraint *fkconstraint);
  static void ATExecDropConstraint(Relation rel, const char *constrName,
***************
*** 2124,2131 ****
   * Disallow ALTER TABLE (and similar commands) when the current backend has
   * any open reference to the target table besides the one just acquired by
   * the calling command; this implies there's an open cursor or active plan.
!  * We need this check because our AccessExclusiveLock doesn't protect us
!  * against stomping on our own foot, only other people's feet!
   *
   * For ALTER TABLE, the only case known to cause serious trouble is ALTER
   * COLUMN TYPE, and some changes are obviously pretty benign, so this could
--- 2126,2133 ----
   * Disallow ALTER TABLE (and similar commands) when the current backend has
   * any open reference to the target table besides the one just acquired by
   * the calling command; this implies there's an open cursor or active plan.
!  * We need this check because our lock doesn't protect us against stomping
!  * on our own foot, only other people's feet!
   *
   * For ALTER TABLE, the only case known to cause serious trouble is ALTER
   * COLUMN TYPE, and some changes are obviously pretty benign, so this could
***************
*** 2202,2212 ****
   *
   * Thanks to the magic of MVCC, an error anywhere along the way rolls back
   * the whole operation; we don't have to do anything special to clean up.
   */
  void
  AlterTable(AlterTableStmt *stmt)
  {
! 	Relation	rel = relation_openrv(stmt->relation, AccessExclusiveLock);
  
  	CheckTableNotInUse(rel, "ALTER TABLE");
  
--- 2204,2231 ----
   *
   * Thanks to the magic of MVCC, an error anywhere along the way rolls back
   * the whole operation; we don't have to do anything special to clean up.
+  *
+  * We lock the table as the first action, with an appropriate lock level
+  * for the subcommands requested. Any subcommand that needs to rewrite
+  * tuples in the table forces the whole command to be executed with
+  * AccessExclusiveLock. If all subcommands do not require rewrite table
+  * then we are able to use just ShareLock. We pass the lock level down
+  * so that we can apply it recursively to inherited tables. Note that the
+  * lock level we want as we recurse may well be higher than required for 
+  * that specific subcommand. So we pass down the overall lock requirement, 
+  * rather than reassess it at lower levels.
   */
  void
  AlterTable(AlterTableStmt *stmt)
  {
! 	Relation	rel;
! 	bool		needFullLock = AlterTableLockLevel(stmt->cmds);
! 
! 	/*
! 	 * Acquire same level of lock as already acquired during parsing.
! 	 */
! 	rel = relation_openrv(stmt->relation, 
! 							needFullLock ? AccessExclusiveLock : ShareLock);
  
  	CheckTableNotInUse(rel, "ALTER TABLE");
  
***************
*** 2248,2254 ****
  			elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
  	}
  
! 	ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt));
  }
  
  /*
--- 2267,2274 ----
  			elog(ERROR, "unrecognized object type: %d", (int) stmt->relkind);
  	}
  
! 	ATController(rel, stmt->cmds, interpretInhOption(stmt->relation->inhOpt), 
! 						needFullLock);
  }
  
  /*
***************
*** 2265,2277 ****
  void
  AlterTableInternal(Oid relid, List *cmds, bool recurse)
  {
! 	Relation	rel = relation_open(relid, AccessExclusiveLock);
  
! 	ATController(rel, cmds, recurse);
  }
  
  static void
! ATController(Relation rel, List *cmds, bool recurse)
  {
  	List	   *wqueue = NIL;
  	ListCell   *lcmd;
--- 2285,2406 ----
  void
  AlterTableInternal(Oid relid, List *cmds, bool recurse)
  {
! 	Relation	rel;
! 
! 	rel = relation_open(relid, ShareLock);
! 
! 	ATController(rel, cmds, recurse, false);
! }
! 
! /*
!  * AlterTableLockLevel
!  *
!  * Sets the overall lock level required for the supplied list of subcommands.
!  * Policy for doing this set according to needs of AlterTable(), see
!  * comments there for overall explanation. 
!  *
!  * Returns true if we need AccessExclusiveLock, false means ShareLock.
!  *
!  * Function is called before and after parsing, so it must give same
!  * answer each time it is called. Some subcommands are transformed
!  * into other subcommand types, so the transform must never be made to a
!  * lower lock level than previously assigned. All transforms are noted below.
!  */
! bool
! AlterTableLockLevel(List *cmds)
! {
! 	ListCell   *lcmd;
! 	bool		needFullLock = false;
! 
! 	foreach(lcmd, cmds)
! 	{
! 		AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
! 
! 		switch (cmd->subtype)
! 		{
! 			/*
! 			 * Need AccessExclusiveLock for these subcommands.
! 			 */
! 			case AT_AddColumn:			/* must rewrite heap */
! 			case AT_DropColumn:			/* change visible to SELECT */
! 			case AT_AlterColumnType:	/* must rewrite heap */
! 			case AT_DropConstraint:		/* as DROP INDEX */
! 			case AT_DropOids:			/* calls AT_DropColumn */
! 			case AT_EnableAlwaysRule:	/* as DROP INDEX */
! 			case AT_EnableReplicaRule:	/* as DROP INDEX */
! 			case AT_EnableRule:			/* as DROP INDEX */
! 			case AT_DisableRule:		/* as DROP INDEX */
! 			case AT_AddInherit:			/* change visible to SELECT */
! 			case AT_DropInherit:		/* change visible to SELECT */
! 			case AT_ChangeOwner:		/* change visible to SELECT */
! 			case AT_SetTableSpace:		/* must rewrite heap */
! 			case AT_DropNotNull:		/* may change some SQL plans */
! 				needFullLock = true;
! 				break;
! 
! 			/*
! 			 * XXX Potentially ShareLocks, with the correct catalog locking
! 			 */
! 			case AT_ColumnDefault:
! 			case AT_SetNotNull:
! 			case AT_SetStatistics:
! 			case AT_SetStorage:				
! 			case AT_ProcessedConstraint:	/* becomes AT_AddConstraint */
! 			case AT_AddConstraintRecurse:	/* becomes AT_AddConstraint */
! 			case AT_ClusterOn:
! 			case AT_DropCluster:
! 			case AT_EnableTrig:
! 			case AT_EnableAlwaysTrig:
! 			case AT_EnableReplicaTrig:
! 			case AT_EnableTrigAll:
! 			case AT_EnableTrigUser:
! 			case AT_DisableTrig:
! 			case AT_DisableTrigAll:
! 			case AT_DisableTrigUser:
! 			case AT_SetRelOptions:
! 			case AT_ResetRelOptions:
! 				needFullLock = true;
! 				break;
  
! 			/*
! 			 * Only ADD UNIQUE, ADD PRIMARY KEY and ADD FOREIGN KEY can use
! 			 * ShareLocks currently.
! 			 */
! 			case AT_AddConstraint:			/* as CREATE INDEX */
! 				if (IsA(cmd->def, Constraint))
! 				{
! 					Constraint *con = (Constraint *) cmd->def;
! 
! 					if (con->contype != CONSTR_PRIMARY &&
! 						con->contype != CONSTR_UNIQUE)
! 						needFullLock = true;
! 				}
! 				else if (IsA(cmd->def, FkConstraint))
! 					needFullLock = false;
! 				else
! 					needFullLock = true;
! 				break;
! 
! 			/*
! 			 * ShareLock is all we need for these subcommands.
! 			 * Be very careful about moving commands onto this list. If
! 			 * you are unsure, place subcommand above, not here.
! 			 */
! 			case AT_AddIndex:				/* from ADD CONSTRAINT */
! 				break;
! 
! 			default:				/* oops */
! 				elog(ERROR, "unrecognized alter table type: %d",
! 					 (int) cmd->subtype);
! 				break;
! 		}
! 	}
! 
! 	return needFullLock;
  }
  
  static void
! ATController(Relation rel, List *cmds, bool recurse, bool needFullLock)
  {
  	List	   *wqueue = NIL;
  	ListCell   *lcmd;
***************
*** 2281,2294 ****
  	{
  		AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
  
! 		ATPrepCmd(&wqueue, rel, cmd, recurse, false);
  	}
  
  	/* Close the relation, but keep lock until commit */
  	relation_close(rel, NoLock);
  
  	/* Phase 2: update system catalogs */
! 	ATRewriteCatalogs(&wqueue);
  
  	/* Phase 3: scan/rewrite tables as needed */
  	ATRewriteTables(&wqueue);
--- 2410,2423 ----
  	{
  		AlterTableCmd *cmd = (AlterTableCmd *) lfirst(lcmd);
  
! 		ATPrepCmd(&wqueue, rel, cmd, recurse, false, needFullLock);
  	}
  
  	/* Close the relation, but keep lock until commit */
  	relation_close(rel, NoLock);
  
  	/* Phase 2: update system catalogs */
! 	ATRewriteCatalogs(&wqueue, needFullLock);
  
  	/* Phase 3: scan/rewrite tables as needed */
  	ATRewriteTables(&wqueue);
***************
*** 2300,2311 ****
   * Traffic cop for ALTER TABLE Phase 1 operations, including simple
   * recursion and permission checks.
   *
!  * Caller must have acquired AccessExclusiveLock on relation already.
   * This lock should be held until commit.
   */
  static void
  ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing)
  {
  	AlteredTableInfo *tab;
  	int			pass;
--- 2429,2440 ----
   * Traffic cop for ALTER TABLE Phase 1 operations, including simple
   * recursion and permission checks.
   *
!  * Caller must have acquired appropriate lock type on relation already.
   * This lock should be held until commit.
   */
  static void
  ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
! 		  bool recurse, bool recursing, bool needFullLock)
  {
  	AlteredTableInfo *tab;
  	int			pass;
***************
*** 2342,2372 ****
  			 * rules.
  			 */
  			ATSimplePermissions(rel, true);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN STATISTICS */
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* Performs own permission checks */
  			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN STORAGE */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
--- 2471,2501 ----
  			 * rules.
  			 */
  			ATSimplePermissions(rel, true);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = cmd->def ? AT_PASS_ADD_CONSTR : AT_PASS_DROP;
  			break;
  		case AT_DropNotNull:	/* ALTER COLUMN DROP NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = AT_PASS_DROP;
  			break;
  		case AT_SetNotNull:		/* ALTER COLUMN SET NOT NULL */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = AT_PASS_ADD_CONSTR;
  			break;
  		case AT_SetStatistics:	/* ALTER COLUMN STATISTICS */
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* Performs own permission checks */
  			ATPrepSetStatistics(rel, cmd->name, cmd->def);
  			pass = AT_PASS_COL_ATTRS;
  			break;
  		case AT_SetStorage:		/* ALTER COLUMN STORAGE */
  			ATSimplePermissions(rel, false);
! 			ATSimpleRecursion(wqueue, rel, cmd, recurse, needFullLock);
  			/* No command-specific prep needed */
  			pass = AT_PASS_COL_ATTRS;
  			break;
***************
*** 2428,2434 ****
  				dropCmd->subtype = AT_DropColumn;
  				dropCmd->name = pstrdup("oid");
  				dropCmd->behavior = cmd->behavior;
! 				ATPrepCmd(wqueue, rel, dropCmd, recurse, false);
  			}
  			pass = AT_PASS_DROP;
  			break;
--- 2557,2563 ----
  				dropCmd->subtype = AT_DropColumn;
  				dropCmd->name = pstrdup("oid");
  				dropCmd->behavior = cmd->behavior;
! 				ATPrepCmd(wqueue, rel, dropCmd, recurse, false, needFullLock);
  			}
  			pass = AT_PASS_DROP;
  			break;
***************
*** 2483,2489 ****
   * conflicts).
   */
  static void
! ATRewriteCatalogs(List **wqueue)
  {
  	int			pass;
  	ListCell   *ltab;
--- 2612,2618 ----
   * conflicts).
   */
  static void
! ATRewriteCatalogs(List **wqueue, bool needFullLock)
  {
  	int			pass;
  	ListCell   *ltab;
***************
*** 2508,2520 ****
  			if (subcmds == NIL)
  				continue;
  
! 			/*
! 			 * Exclusive lock was obtained by phase 1, needn't get it again
  			 */
  			rel = relation_open(tab->relid, NoLock);
  
  			foreach(lcmd, subcmds)
! 				ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd));
  
  			/*
  			 * After the ALTER TYPE pass, do cleanup work (this is not done in
--- 2637,2649 ----
  			if (subcmds == NIL)
  				continue;
  
! 			/* 
! 			 * Appropriate lock was obtained by phase 1, needn't get it again
  			 */
  			rel = relation_open(tab->relid, NoLock);
  
  			foreach(lcmd, subcmds)
! 				ATExecCmd(wqueue, tab, rel, (AlterTableCmd *) lfirst(lcmd), needFullLock);
  
  			/*
  			 * After the ALTER TYPE pass, do cleanup work (this is not done in
***************
*** 2549,2555 ****
   */
  static void
  ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 		  AlterTableCmd *cmd)
  {
  	switch (cmd->subtype)
  	{
--- 2678,2684 ----
   */
  static void
  ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 		  AlterTableCmd *cmd, bool needFullLock)
  {
  	switch (cmd->subtype)
  	{
***************
*** 2578,2593 ****
  			ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false);
  			break;
  		case AT_ReAddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true);
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, false);
  			break;
  		case AT_AddConstraintRecurse:	/* ADD CONSTRAINT with recursion */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, true);
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
  			ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false);
--- 2707,2722 ----
  			ATExecDropColumn(rel, cmd->name, cmd->behavior, true, false);
  			break;
  		case AT_AddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, false, needFullLock);
  			break;
  		case AT_ReAddIndex:		/* ADD INDEX */
! 			ATExecAddIndex(tab, rel, (IndexStmt *) cmd->def, true, needFullLock);
  			break;
  		case AT_AddConstraint:	/* ADD CONSTRAINT */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, false, needFullLock);
  			break;
  		case AT_AddConstraintRecurse:	/* ADD CONSTRAINT with recursion */
! 			ATExecAddConstraint(wqueue, tab, rel, cmd->def, true, needFullLock);
  			break;
  		case AT_DropConstraint:	/* DROP CONSTRAINT */
  			ATExecDropConstraint(rel, cmd->name, cmd->behavior, false, false);
***************
*** 2725,2730 ****
--- 2854,2860 ----
  			ObjectAddress object;
  
  			OldHeap = heap_open(tab->relid, NoLock);
+ 			Assert(RelationLockedByMe(OldHeap, AccessExclusiveLock));
  
  			/*
  			 * We can never allow rewriting of shared or nailed-in-cache
***************
*** 2847,2858 ****
  				Relation	refrel;
  
  				if (rel == NULL)
- 				{
  					/* Long since locked, no need for another */
  					rel = heap_open(tab->relid, NoLock);
- 				}
  
! 				refrel = heap_open(con->refrelid, RowShareLock);
  
  				validateForeignKeyConstraint(fkconstraint, rel, refrel,
  											 con->conid);
--- 2977,2992 ----
  				Relation	refrel;
  
  				if (rel == NULL)
  					/* Long since locked, no need for another */
  					rel = heap_open(tab->relid, NoLock);
  
! 				/*
! 				 * We need to prevent write access to table while
! 				 * we make the check. RowShareLock is needed if we
! 				 * use triggers, but if we want to avoid using
! 				 * row-level locks if we can.
! 				 */
! 				refrel = heap_open(con->refrelid, ShareLock);
  
  				validateForeignKeyConstraint(fkconstraint, rel, refrel,
  											 con->conid);
***************
*** 2893,2901 ****
--- 3027,3041 ----
  	newTupDesc = RelationGetDescr(oldrel);		/* includes all mods */
  
  	if (OidIsValid(OIDNewHeap))
+ 	{
  		newrel = heap_open(OIDNewHeap, AccessExclusiveLock);
+ 		Assert(RelationLockedByMe(oldrel, AccessExclusiveLock));
+ 	}
  	else
+ 	{
  		newrel = NULL;
+ 		Assert(RelationLockedByMe(oldrel, ShareLock));
+ 	}
  
  	/*
  	 * If we need to rewrite the table, the operation has to be propagated to
***************
*** 3230,3236 ****
   */
  static void
  ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse)
  {
  	/*
  	 * Propagate to children if desired.  Non-table relations never have
--- 3370,3376 ----
   */
  static void
  ATSimpleRecursion(List **wqueue, Relation rel,
! 				  AlterTableCmd *cmd, bool recurse, bool needFullLock)
  {
  	/*
  	 * Propagate to children if desired.  Non-table relations never have
***************
*** 3257,3265 ****
  
  			if (childrelid == relid)
  				continue;
! 			childrel = relation_open(childrelid, AccessExclusiveLock);
  			CheckTableNotInUse(childrel, "ALTER TABLE");
! 			ATPrepCmd(wqueue, childrel, cmd, false, true);
  			relation_close(childrel, NoLock);
  		}
  	}
--- 3397,3408 ----
  
  			if (childrelid == relid)
  				continue;
! 
! 			childrel = relation_open(childrelid, 
! 						needFullLock ? AccessExclusiveLock : ShareLock);
! 
  			CheckTableNotInUse(childrel, "ALTER TABLE");
! 			ATPrepCmd(wqueue, childrel, cmd, false, true, needFullLock);
  			relation_close(childrel, NoLock);
  		}
  	}
***************
*** 3275,3281 ****
   */
  static void
  ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd)
  {
  	Oid			relid = RelationGetRelid(rel);
  	ListCell   *child;
--- 3418,3424 ----
   */
  static void
  ATOneLevelRecursion(List **wqueue, Relation rel,
! 					AlterTableCmd *cmd, bool needFullLock)
  {
  	Oid			relid = RelationGetRelid(rel);
  	ListCell   *child;
***************
*** 3289,3297 ****
  		Oid			childrelid = lfirst_oid(child);
  		Relation	childrel;
  
! 		childrel = relation_open(childrelid, AccessExclusiveLock);
  		CheckTableNotInUse(childrel, "ALTER TABLE");
! 		ATPrepCmd(wqueue, childrel, cmd, true, true);
  		relation_close(childrel, NoLock);
  	}
  }
--- 3432,3442 ----
  		Oid			childrelid = lfirst_oid(child);
  		Relation	childrel;
  
! 		childrel = relation_open(childrelid, 
! 						needFullLock ? AccessExclusiveLock : ShareLock);
! 
  		CheckTableNotInUse(childrel, "ALTER TABLE");
! 		ATPrepCmd(wqueue, childrel, cmd, true, true, needFullLock);
  		relation_close(childrel, NoLock);
  	}
  }
***************
*** 3427,3433 ****
  		colDefChild->inhcount = 1;
  		colDefChild->is_local = false;
  
! 		ATOneLevelRecursion(wqueue, rel, childCmd);
  	}
  	else
  	{
--- 3572,3578 ----
  		colDefChild->inhcount = 1;
  		colDefChild->is_local = false;
  
! 		ATOneLevelRecursion(wqueue, rel, childCmd, true);
  	}
  	else
  	{
***************
*** 4235,4241 ****
   */
  static void
  ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild)
  {
  	bool		check_rights;
  	bool		skip_build;
--- 4380,4386 ----
   */
  static void
  ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
! 			   IndexStmt *stmt, bool is_rebuild, bool needFullLock)
  {
  	bool		check_rights;
  	bool		skip_build;
***************
*** 4275,4281 ****
   */
  static void
  ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					Node *newConstraint, bool recurse)
  {
  	switch (nodeTag(newConstraint))
  	{
--- 4420,4426 ----
   */
  static void
  ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					Node *newConstraint, bool recurse, bool needFullLock)
  {
  	switch (nodeTag(newConstraint))
  	{
***************
*** 4293,4299 ****
  				{
  					case CONSTR_CHECK:
  						ATAddCheckConstraint(wqueue, tab, rel,
! 											 constr, recurse, false);
  						break;
  					default:
  						elog(ERROR, "unrecognized constraint type: %d",
--- 4438,4444 ----
  				{
  					case CONSTR_CHECK:
  						ATAddCheckConstraint(wqueue, tab, rel,
! 											 constr, recurse, false, needFullLock);
  						break;
  					default:
  						elog(ERROR, "unrecognized constraint type: %d",
***************
*** 4356,4362 ****
   */
  static void
  ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					 Constraint *constr, bool recurse, bool recursing)
  {
  	List	   *newcons;
  	ListCell   *lcon;
--- 4501,4508 ----
   */
  static void
  ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
! 					 Constraint *constr, bool recurse, bool recursing,
! 					 bool needFullLock)
  {
  	List	   *newcons;
  	ListCell   *lcon;
***************
*** 4428,4434 ****
  		Relation	childrel;
  		AlteredTableInfo *childtab;
  
! 		childrel = heap_open(childrelid, AccessExclusiveLock);
  		CheckTableNotInUse(childrel, "ALTER TABLE");
  
  		/* Find or create work queue entry for this table */
--- 4574,4582 ----
  		Relation	childrel;
  		AlteredTableInfo *childtab;
  
! 		childrel = heap_open(childrelid, 
! 						needFullLock ? AccessExclusiveLock : ShareLock);
! 
  		CheckTableNotInUse(childrel, "ALTER TABLE");
  
  		/* Find or create work queue entry for this table */
***************
*** 4436,4442 ****
  
  		/* Recurse to child */
  		ATAddCheckConstraint(wqueue, childtab, childrel,
! 							 constr, recurse, true);
  
  		heap_close(childrel, NoLock);
  	}
--- 4584,4590 ----
  
  		/* Recurse to child */
  		ATAddCheckConstraint(wqueue, childtab, childrel,
! 							 constr, recurse, true, needFullLock);
  
  		heap_close(childrel, NoLock);
  	}
***************
*** 4470,4482 ****
  	Oid			constrOid;
  
  	/*
! 	 * Grab an exclusive lock on the pk table, so that someone doesn't delete
! 	 * rows out from under us. (Although a lesser lock would do for that
! 	 * purpose, we'll need exclusive lock anyway to add triggers to the pk
! 	 * table; trying to start with a lesser lock will just create a risk of
! 	 * deadlock.)
  	 */
! 	pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
  
  	/*
  	 * Validity and permissions checks
--- 4618,4629 ----
  	Oid			constrOid;
  
  	/*
! 	 * Grab a ShareLock lock on the pk table, so that someone doesn't delete
! 	 * rows out from under us. We can't take a lower level of lock, since
! 	 * we need ShareLocks to add triggers for the constraint. (Remember: we
! 	 * add triggers to *both* tables when we add a Foreign Key).
  	 */
! 	pkrel = heap_openrv(fkconstraint->pktable, ShareLock);
  
  	/*
  	 * Validity and permissions checks
***************
*** 4993,4999 ****
   * Scan the existing rows in a table to verify they meet a proposed FK
   * constraint.
   *
!  * Caller must have opened and locked both relations.
   */
  static void
  validateForeignKeyConstraint(FkConstraint *fkconstraint,
--- 5140,5146 ----
   * Scan the existing rows in a table to verify they meet a proposed FK
   * constraint.
   *
!  * Caller must have opened and locked both relations at ShareLock or higher.
   */
  static void
  validateForeignKeyConstraint(FkConstraint *fkconstraint,
***************
*** 5541,5547 ****
  	 * alter would put them out of step.
  	 */
  	if (recurse)
! 		ATSimpleRecursion(wqueue, rel, cmd, recurse);
  	else if (!recursing &&
  			 find_inheritance_children(RelationGetRelid(rel)) != NIL)
  		ereport(ERROR,
--- 5688,5694 ----
  	 * alter would put them out of step.
  	 */
  	if (recurse)
! 		ATSimpleRecursion(wqueue, rel, cmd, recurse, true);
  	else if (!recursing &&
  			 find_inheritance_children(RelationGetRelid(rel)) != NIL)
  		ereport(ERROR,
***************
*** 5938,5943 ****
--- 6085,6094 ----
  	 */
  }
  
+ /*
+  * Called to add additional work items if ALTER TYPE touched indexed
+  * columns. Hence we always take AccessExclusiveLocks here.
+  */
  static void
  ATPostAlterTypeParse(char *cmd, List **wqueue)
  {
Index: src/backend/commands/trigger.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/commands/trigger.c,v
retrieving revision 1.237
diff -c -r1.237 trigger.c
*** src/backend/commands/trigger.c	1 Sep 2008 20:42:44 -0000	1.237
--- src/backend/commands/trigger.c	7 Oct 2008 10:15:25 -0000
***************
*** 95,101 ****
  	Oid			funcoid;
  	Oid			funcrettype;
  	Oid			trigoid;
- 	int			found = 0;
  	int			i;
  	char		constrtrigname[NAMEDATALEN];
  	char	   *trigname;
--- 95,100 ----
***************
*** 103,110 ****
  	Oid			constrrelid = InvalidOid;
  	ObjectAddress myself,
  				referenced;
  
! 	rel = heap_openrv(stmt->relation, AccessExclusiveLock);
  
  	if (rel->rd_rel->relkind != RELKIND_RELATION)
  		ereport(ERROR,
--- 102,115 ----
  	Oid			constrrelid = InvalidOid;
  	ObjectAddress myself,
  				referenced;
+ 	Form_pg_class rd_rel;
  
! 	/*
! 	 * ShareLock is sufficient to prevent concurrent write activity
! 	 * to relation. If we had SELECT Triggers we would need to take
! 	 * an AccessExclusiveLock, just as we do with ON SELECT Rules.
! 	 */
! 	rel = heap_openrv(stmt->relation, ShareLock);
  
  	if (rel->rd_rel->relkind != RELKIND_RELATION)
  		ereport(ERROR,
***************
*** 280,292 ****
  	}
  
  	/*
! 	 * Scan pg_trigger for existing triggers on relation.  We do this mainly
! 	 * because we must count them; a secondary benefit is to give a nice error
! 	 * message if there's already a trigger of the same name. (The unique
! 	 * index on tgrelid/tgname would complain anyway.)
  	 *
! 	 * NOTE that this is cool only because we have AccessExclusiveLock on the
! 	 * relation, so the trigger set won't be changing underneath us.
  	 */
  	ScanKeyInit(&key,
  				Anum_pg_trigger_tgrelid,
--- 285,297 ----
  	}
  
  	/*
! 	 * Scan pg_trigger for existing triggers on relation.  We do this 
! 	 * because we want to give a nice error message if there's already a 
! 	 * trigger of the same name. (The unique index on tgrelid/tgname would 
! 	 * complain anyway.)
  	 *
! 	 * We don't have AccessExclusiveLock, so its possible for us to succeed
! 	 * here then later fail when we insert the trigger. We tried.
  	 */
  	ScanKeyInit(&key,
  				Anum_pg_trigger_tgrelid,
***************
*** 303,309 ****
  					(errcode(ERRCODE_DUPLICATE_OBJECT),
  				  errmsg("trigger \"%s\" for relation \"%s\" already exists",
  						 trigname, stmt->relation->relname)));
- 		found++;
  	}
  	systable_endscan(tgscan);
  
--- 308,313 ----
***************
*** 405,415 ****
  		elog(ERROR, "cache lookup failed for relation %u",
  			 RelationGetRelid(rel));
  
! 	((Form_pg_class) GETSTRUCT(tuple))->reltriggers = found + 1;
  
! 	simple_heap_update(pgrel, &tuple->t_self, tuple);
! 
! 	CatalogUpdateIndexes(pgrel, tuple);
  
  	heap_freetuple(tuple);
  	heap_close(pgrel, RowExclusiveLock);
--- 409,432 ----
  		elog(ERROR, "cache lookup failed for relation %u",
  			 RelationGetRelid(rel));
  
! 	rd_rel = (Form_pg_class) GETSTRUCT(tuple);
  
! 	/*
! 	 * If anything changed, write out the tuple using non-transactional
! 	 * update, allowing us to have concurrent CREATE TRIGGER. See lengthy
! 	 * comments in index_update_stats().
! 	 */
! 	if (rd_rel->reltriggers == 0)
! 	{
! 		rd_rel->reltriggers = 1;
! 		heap_inplace_update(pgrel, tuple);
! 		/* the above sends a cache inval message */
! 	}
! 	else
! 	{
! 		/* no need to change tuple, but force relcache inval anyway */
! 		CacheInvalidateRelcacheByTuple(tuple);
! 	}
  
  	heap_freetuple(tuple);
  	heap_close(pgrel, RowExclusiveLock);
***************
*** 870,878 ****
  	 * Update relation's pg_class entry.  Crucial side-effect: other backends
  	 * (and this one too!) are sent SI message to make them rebuild relcache
  	 * entries.
- 	 *
- 	 * Note this is OK only because we have AccessExclusiveLock on the rel, so
- 	 * no one else is creating/deleting triggers on this rel at the same time.
  	 */
  	pgrel = heap_open(RelationRelationId, RowExclusiveLock);
  	tuple = SearchSysCacheCopy(RELOID,
--- 887,892 ----
***************
*** 882,892 ****
  		elog(ERROR, "cache lookup failed for relation %u", relid);
  	classForm = (Form_pg_class) GETSTRUCT(tuple);
  
- 	if (classForm->reltriggers == 0)	/* should not happen */
- 		elog(ERROR, "relation \"%s\" has reltriggers = 0",
- 			 RelationGetRelationName(rel));
- 	classForm->reltriggers--;
- 
  	simple_heap_update(pgrel, &tuple->t_self, tuple);
  
  	CatalogUpdateIndexes(pgrel, tuple);
--- 896,901 ----
***************
*** 1134,1151 ****
  RelationBuildTriggers(Relation relation)
  {
  	TriggerDesc *trigdesc;
! 	int			ntrigs = relation->rd_rel->reltriggers;
! 	Trigger    *triggers;
! 	int			found = 0;
  	Relation	tgrel;
  	ScanKeyData skey;
  	SysScanDesc tgscan;
  	HeapTuple	htup;
  	MemoryContext oldContext;
  
! 	Assert(ntrigs > 0);			/* else I should not have been called */
! 
! 	triggers = (Trigger *) palloc(ntrigs * sizeof(Trigger));
  
  	/*
  	 * Note: since we scan the triggers using TriggerRelidNameIndexId, we will
--- 1143,1166 ----
  RelationBuildTriggers(Relation relation)
  {
  	TriggerDesc *trigdesc;
! 	int			numtrigs;
! 	int			maxtrigs;
! 	Trigger    **triggers;
! 	Trigger		*trigarray;
! 	int			i;
  	Relation	tgrel;
  	ScanKeyData skey;
  	SysScanDesc tgscan;
  	HeapTuple	htup;
  	MemoryContext oldContext;
  
! 	/*
! 	 * allocate an array to hold the triggers (the array is extended if
! 	 * necessary)
! 	 */
! 	maxtrigs = 4;
! 	triggers = (Trigger **) palloc(sizeof(Trigger *) * maxtrigs);
! 	numtrigs = 0;
  
  	/*
  	 * Note: since we scan the triggers using TriggerRelidNameIndexId, we will
***************
*** 1167,1176 ****
  		Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
  		Trigger    *build;
  
! 		if (found >= ntrigs)
! 			elog(ERROR, "too many trigger records found for relation \"%s\"",
! 				 RelationGetRelationName(relation));
! 		build = &(triggers[found]);
  
  		build->tgoid = HeapTupleGetOid(htup);
  		build->tgname = DatumGetCString(DirectFunctionCall1(nameout,
--- 1182,1188 ----
  		Form_pg_trigger pg_trigger = (Form_pg_trigger) GETSTRUCT(htup);
  		Trigger    *build;
  
! 		build = (Trigger *) palloc(sizeof(Trigger));
  
  		build->tgoid = HeapTupleGetOid(htup);
  		build->tgname = DatumGetCString(DirectFunctionCall1(nameout,
***************
*** 1199,1205 ****
  			bytea	   *val;
  			bool		isnull;
  			char	   *p;
- 			int			i;
  
  			val = DatumGetByteaP(fastgetattr(htup,
  											 Anum_pg_trigger_tgargs,
--- 1211,1216 ----
***************
*** 1218,1240 ****
  		else
  			build->tgargs = NULL;
  
! 		found++;
  	}
  
  	systable_endscan(tgscan);
  	heap_close(tgrel, AccessShareLock);
  
! 	if (found != ntrigs)
! 		elog(ERROR, "%d trigger record(s) not found for relation \"%s\"",
! 			 ntrigs - found,
! 			 RelationGetRelationName(relation));
  
  	/* Build trigdesc */
  	trigdesc = (TriggerDesc *) palloc0(sizeof(TriggerDesc));
! 	trigdesc->triggers = triggers;
! 	trigdesc->numtriggers = ntrigs;
! 	for (found = 0; found < ntrigs; found++)
! 		InsertTrigger(trigdesc, &(triggers[found]), found);
  
  	/* Copy completed trigdesc into cache storage */
  	oldContext = MemoryContextSwitchTo(CacheMemoryContext);
--- 1229,1266 ----
  		else
  			build->tgargs = NULL;
  
! 		if (numtrigs >= maxtrigs)
! 		{
! 			maxtrigs *= 2;
! 			triggers = (Trigger **) repalloc(triggers, sizeof(Trigger *) * maxtrigs);
! 		}
! 		triggers[numtrigs++] = build;
  	}
  
  	systable_endscan(tgscan);
  	heap_close(tgrel, AccessShareLock);
  
! 	if (numtrigs == 0)
! 	{
! 		pfree(triggers);
! 		return;
! 	}
  
  	/* Build trigdesc */
  	trigdesc = (TriggerDesc *) palloc0(sizeof(TriggerDesc));
!  	trigarray = (Trigger *) palloc(numtrigs * sizeof(Trigger));
! 
! 	for (i = 0; i < numtrigs; i++)
! 	{
! 		memcpy(&(trigarray[i]), triggers[i], sizeof(Trigger));
! 		pfree(triggers[i]);
! 	}
! 	pfree(triggers);
! 
!  	trigdesc->triggers = trigarray;
! 	trigdesc->numtriggers = numtrigs;
! 	for (i = 0; i < numtrigs; i++)
! 		InsertTrigger(trigdesc, &(trigarray[i]), i);
  
  	/* Copy completed trigdesc into cache storage */
  	oldContext = MemoryContextSwitchTo(CacheMemoryContext);
Index: src/backend/parser/parse_utilcmd.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/parser/parse_utilcmd.c,v
retrieving revision 2.17
diff -c -r2.17 parse_utilcmd.c
*** src/backend/parser/parse_utilcmd.c	1 Sep 2008 20:42:45 -0000	2.17
--- src/backend/parser/parse_utilcmd.c	7 Oct 2008 10:14:53 -0000
***************
*** 1292,1298 ****
  }
  
  /*
!  * transformIndexStmt - parse analysis for CREATE INDEX
   *
   * Note: this is a no-op for an index not using either index expressions or
   * a predicate expression.	There are several code paths that create indexes
--- 1292,1298 ----
  }
  
  /*
!  * transformIndexStmt - parse analysis for CREATE INDEX and ALTER TABLE
   *
   * Note: this is a no-op for an index not using either index expressions or
   * a predicate expression.	There are several code paths that create indexes
***************
*** 1318,1324 ****
  	 * because addRangeTableEntry() would acquire only AccessShareLock,
  	 * leaving DefineIndex() needing to do a lock upgrade with consequent risk
  	 * of deadlock.  Make sure this stays in sync with the type of lock
! 	 * DefineIndex() wants.
  	 */
  	rel = heap_openrv(stmt->relation,
  				  (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
--- 1318,1325 ----
  	 * because addRangeTableEntry() would acquire only AccessShareLock,
  	 * leaving DefineIndex() needing to do a lock upgrade with consequent risk
  	 * of deadlock.  Make sure this stays in sync with the type of lock
! 	 * DefineIndex() wants. If we are being called by ALTER TABLE, we may
! 	 * already hold a higher lock...
  	 */
  	rel = heap_openrv(stmt->relation,
  				  (stmt->concurrent ? ShareUpdateExclusiveLock : ShareLock));
***************
*** 1674,1679 ****
--- 1675,1681 ----
  	List	   *newcmds = NIL;
  	bool		skipValidation = true;
  	AlterTableCmd *newcmd;
+ 	bool		needFullLock;
  
  	/*
  	 * We must not scribble on the passed-in AlterTableStmt, so copy it. (This
***************
*** 1682,1694 ****
  	stmt = (AlterTableStmt *) copyObject(stmt);
  
  	/*
! 	 * Acquire exclusive lock on the target relation, which will be held until
  	 * end of transaction.	This ensures any decisions we make here based on
  	 * the state of the relation will still be good at execution. We must get
! 	 * exclusive lock now because execution will; taking a lower grade lock
! 	 * now and trying to upgrade later risks deadlock.
  	 */
! 	rel = relation_openrv(stmt->relation, AccessExclusiveLock);
  
  	/* Set up pstate */
  	pstate = make_parsestate(NULL);
--- 1684,1703 ----
  	stmt = (AlterTableStmt *) copyObject(stmt);
  
  	/*
! 	 * Assign the appropriate lock level for this list of subcommands.
! 	 */
! 	needFullLock = AlterTableLockLevel(stmt->cmds);
! 
! 	/*
! 	 * Acquire appropriate lock on the target relation, which will be held until
  	 * end of transaction.	This ensures any decisions we make here based on
  	 * the state of the relation will still be good at execution. We must get
! 	 * lock now because execution will; taking a lower grade lock now and 
! 	 * trying to upgrade later risks deadlock.  Any new commands we add
! 	 * after this must not upgrade the lock level requested here.
  	 */
! 	rel = relation_openrv(stmt->relation, 
! 							needFullLock ? AccessExclusiveLock : ShareLock);
  
  	/* Set up pstate */
  	pstate = make_parsestate(NULL);
Index: src/backend/rewrite/rewriteDefine.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/rewrite/rewriteDefine.c,v
retrieving revision 1.129
diff -c -r1.129 rewriteDefine.c
*** src/backend/rewrite/rewriteDefine.c	25 Aug 2008 22:42:34 -0000	1.129
--- src/backend/rewrite/rewriteDefine.c	7 Oct 2008 10:14:53 -0000
***************
*** 236,246 ****
  	/*
  	 * If we are installing an ON SELECT rule, we had better grab
  	 * AccessExclusiveLock to ensure no SELECTs are currently running on the
! 	 * event relation.	For other types of rules, it might be sufficient to
! 	 * grab ShareLock to lock out insert/update/delete actions.  But for now,
! 	 * let's just grab AccessExclusiveLock all the time.
  	 */
! 	event_relation = heap_open(event_relid, AccessExclusiveLock);
  
  	/*
  	 * Check user has permission to apply rules to this relation.
--- 236,248 ----
  	/*
  	 * If we are installing an ON SELECT rule, we had better grab
  	 * AccessExclusiveLock to ensure no SELECTs are currently running on the
! 	 * event relation.	For other types of rules, it is sufficient to
! 	 * grab ShareLock to lock out insert/update/delete actions.
  	 */
! 	if (event_type == CMD_SELECT)
! 		event_relation = heap_open(event_relid, AccessExclusiveLock);
! 	else
! 		event_relation = heap_open(event_relid, ShareLock);
  
  	/*
  	 * Check user has permission to apply rules to this relation.
Index: src/backend/rewrite/rewriteSupport.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/rewrite/rewriteSupport.c,v
retrieving revision 1.66
diff -c -r1.66 rewriteSupport.c
*** src/backend/rewrite/rewriteSupport.c	19 Jun 2008 00:46:05 -0000	1.66
--- src/backend/rewrite/rewriteSupport.c	7 Oct 2008 11:13:29 -0000
***************
*** 73,84 ****
  		/* Do the update */
  		classForm->relhasrules = relHasRules;
  		if (relIsBecomingView)
  			classForm->relkind = RELKIND_VIEW;
  
! 		simple_heap_update(relationRelation, &tuple->t_self, tuple);
  
! 		/* Keep the catalog indexes up to date */
! 		CatalogUpdateIndexes(relationRelation, tuple);
  	}
  	else
  	{
--- 73,88 ----
  		/* Do the update */
  		classForm->relhasrules = relHasRules;
  		if (relIsBecomingView)
+ 		{
  			classForm->relkind = RELKIND_VIEW;
  
! 			simple_heap_update(relationRelation, &tuple->t_self, tuple);
  
! 			/* Keep the catalog indexes up to date */
! 			CatalogUpdateIndexes(relationRelation, tuple);
! 		}
! 		else
! 			heap_inplace_update(relationRelation, tuple);
  	}
  	else
  	{
Index: src/backend/storage/lmgr/lmgr.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/lmgr/lmgr.c,v
retrieving revision 1.97
diff -c -r1.97 lmgr.c
*** src/backend/storage/lmgr/lmgr.c	4 Mar 2008 19:54:06 -0000	1.97
--- src/backend/storage/lmgr/lmgr.c	7 Oct 2008 10:14:53 -0000
***************
*** 181,186 ****
--- 181,198 ----
  		AcceptInvalidationMessages();
  }
  
+ bool
+ RelationLockedByMe(Relation relation, LOCKMODE lockmode)
+ {
+ 	LOCKTAG		tag;
+ 
+ 	SET_LOCKTAG_RELATION(tag,
+ 						 relation->rd_lockInfo.lockRelId.dbId,
+ 						 relation->rd_lockInfo.lockRelId.relId);
+ 
+ 	return LockHeldByMe(&tag, lockmode);
+ }
+ 
  /*
   *		ConditionalLockRelation
   *
Index: src/backend/storage/lmgr/lock.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/storage/lmgr/lock.c,v
retrieving revision 1.184
diff -c -r1.184 lock.c
*** src/backend/storage/lmgr/lock.c	1 Aug 2008 13:16:09 -0000	1.184
--- src/backend/storage/lmgr/lock.c	7 Oct 2008 10:14:53 -0000
***************
*** 438,443 ****
--- 438,473 ----
  	return lockhash;
  }
  
+ bool
+ LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode)
+ {
+ 	LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
+ 	LockMethod	lockMethodTable;
+ 	LOCALLOCKTAG localtag;
+ 	LOCALLOCK  *locallock;
+ 	bool		found = false;
+ 	LOCKMODE	lockmode_search = lockmode;
+ 
+ 	Assert(!(lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods)));
+ 	lockMethodTable = LockMethods[lockmethodid];
+ 	Assert(!(lockmode <= 0 || lockmode > lockMethodTable->numLockModes));
+ 
+ 	/*
+ 	 * Find or create a LOCALLOCK entry for this lock and lockmode
+ 	 */
+ 	MemSet(&localtag, 0, sizeof(localtag));		/* must clear padding */
+ 	localtag.lock = *locktag;
+ 
+ 	while (!found && lockmode_search <= lockMethodTable->numLockModes)
+ 	{
+ 		localtag.mode = lockmode_search++;
+ 		locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
+ 											  (void *) &localtag,
+ 											  HASH_FIND, &found);
+ 	}
+ 
+ 	return found;
+ }
  
  /*
   * LockAcquire -- Check for lock conflicts, sleep if conflict found,
Index: src/backend/utils/adt/ri_triggers.c
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/backend/utils/adt/ri_triggers.c,v
retrieving revision 1.110
diff -c -r1.110 ri_triggers.c
*** src/backend/utils/adt/ri_triggers.c	15 Sep 2008 23:37:39 -0000	1.110
--- src/backend/utils/adt/ri_triggers.c	7 Oct 2008 10:14:53 -0000
***************
*** 2601,2607 ****
   *	This is not a trigger procedure, but is called during ALTER TABLE
   *	ADD FOREIGN KEY to validate the initial table contents.
   *
!  *	We expect that an exclusive lock has been taken on rel and pkrel;
   *	hence, we do not need to lock individual rows for the check.
   *
   *	If the check fails because the current user doesn't have permissions
--- 2601,2607 ----
   *	This is not a trigger procedure, but is called during ALTER TABLE
   *	ADD FOREIGN KEY to validate the initial table contents.
   *
!  *	We expect that a ShareLock or higher has been taken on rel and pkrel;
   *	hence, we do not need to lock individual rows for the check.
   *
   *	If the check fails because the current user doesn't have permissions
Index: src/include/commands/tablecmds.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/commands/tablecmds.h,v
retrieving revision 1.41
diff -c -r1.41 tablecmds.h
*** src/include/commands/tablecmds.h	19 Jun 2008 00:46:06 -0000	1.41
--- src/include/commands/tablecmds.h	7 Oct 2008 10:14:53 -0000
***************
*** 24,29 ****
--- 24,31 ----
  
  extern void AlterTable(AlterTableStmt *stmt);
  
+ extern bool AlterTableLockLevel(List *cmds);
+ 
  extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing);
  
  extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
Index: src/include/storage/lmgr.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/lmgr.h,v
retrieving revision 1.62
diff -c -r1.62 lmgr.h
*** src/include/storage/lmgr.h	12 May 2008 00:00:53 -0000	1.62
--- src/include/storage/lmgr.h	7 Oct 2008 10:14:53 -0000
***************
*** 32,37 ****
--- 32,39 ----
  extern bool ConditionalLockRelation(Relation relation, LOCKMODE lockmode);
  extern void UnlockRelation(Relation relation, LOCKMODE lockmode);
  
+ extern bool RelationLockedByMe(Relation relation, LOCKMODE lockmode);
+ 
  extern void LockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
  extern void UnlockRelationIdForSession(LockRelId *relid, LOCKMODE lockmode);
  
Index: src/include/storage/lock.h
===================================================================
RCS file: /home/sriggs/pg/REPOSITORY/pgsql/src/include/storage/lock.h,v
retrieving revision 1.114
diff -c -r1.114 lock.h
*** src/include/storage/lock.h	16 Sep 2008 01:56:26 -0000	1.114
--- src/include/storage/lock.h	7 Oct 2008 10:14:53 -0000
***************
*** 467,472 ****
--- 467,473 ----
  extern void InitLocks(void);
  extern LockMethod GetLocksMethodTable(const LOCK *lock);
  extern uint32 LockTagHashCode(const LOCKTAG *locktag);
+ extern bool LockHeldByMe(const LOCKTAG *locktag, LOCKMODE lockmode);
  extern LockAcquireResult LockAcquire(const LOCKTAG *locktag,
  			LOCKMODE lockmode,
  			bool sessionLock,
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to