Hi hackers,

Recently I've been working on a CASCADE option for ALTER ... OWNER TO 
statement. Although it's still a working prototype, I think it's time to share 
my work.


Introduction
============

As of now there's no way to transfer the ownership of an object and all its 
dependent objects in one step. One has to manually alter the owner of each 
object, be it a table, a schema or something else. This patch adds the 
'CASCADE' option to every 'ALTER X OWNER TO' statement, including the 'ALTER 
DATABASE db OWNER TO user CASCADE' which turns out to be a delicate matter.


Implementation
==============

There are two functions that process 'ALTER ... OWNER' statement: 
ExecAlterOwnerStmt() and ATExecCmd(). The latter function deals with the tasks 
that refer to all kinds of relations, while the first one handles the remaining 
object types. Basically, all I had to do is to add 'cascade' flag to the 
corresponding parsenodes and to make these functions call the dependency tree 
walker function (which would change the ownership of the dependent objects if 
needed). Of course, there are various corner cases for each kind of objects 
that require special treatment, but the code speaks for itself.

The aforementioned 'ALTER DATABASE db ...' is handled in a special way. Since 
objects that don't belong to the 'current database' are hidden, it is 
impossible to change their owner directly, so we have to do the job in the 
background worker that is connected to the 'db'. I'm not sure if this is the 
best solution available, but anyway.


What works
==========

Actually, it seems to work in simple cases like 'a table with its inheritors' 
or 'a schema full of tables', but of course there might be things I've 
overlooked. There are some regression tests, though, and I'll continue to 
write some more.


What's dubious
==============

It is unclear what kinds of objects should be transferred in case of database 
ownership change, since there's no way to get the full list of objects that 
depend on a given database. Currently the code changes ownership of all 
schemas (excluding the 'information_schema' and some others) and their 
contents, but this is a temporary limitation.

Feedback is welcome!


-- 
Dmitry Ivanov
Postgres Professional: http://www.postgrespro.com
Russian Postgres Company
diff --git a/src/backend/catalog/dependency.c b/src/backend/catalog/dependency.c
index c48e37b..54be671 100644
--- a/src/backend/catalog/dependency.c
+++ b/src/backend/catalog/dependency.c
@@ -21,6 +21,7 @@
 #include "catalog/index.h"
 #include "catalog/objectaccess.h"
 #include "catalog/pg_amop.h"
+#include "catalog/pg_aggregate.h"
 #include "catalog/pg_amproc.h"
 #include "catalog/pg_attrdef.h"
 #include "catalog/pg_authid.h"
@@ -40,6 +41,7 @@
 #include "catalog/pg_foreign_server.h"
 #include "catalog/pg_language.h"
 #include "catalog/pg_largeobject.h"
+#include "catalog/pg_largeobject_metadata.h"
 #include "catalog/pg_namespace.h"
 #include "catalog/pg_opclass.h"
 #include "catalog/pg_operator.h"
@@ -56,6 +58,7 @@
 #include "catalog/pg_ts_template.h"
 #include "catalog/pg_type.h"
 #include "catalog/pg_user_mapping.h"
+#include "commands/alter.h"
 #include "commands/comment.h"
 #include "commands/defrem.h"
 #include "commands/event_trigger.h"
@@ -66,6 +69,7 @@
 #include "commands/seclabel.h"
 #include "commands/trigger.h"
 #include "commands/typecmds.h"
+#include "commands/tablecmds.h"
 #include "nodes/nodeFuncs.h"
 #include "parser/parsetree.h"
 #include "rewrite/rewriteRemove.h"
@@ -77,17 +81,6 @@
 #include "utils/tqual.h"
 
 
-/*
- * Deletion processing requires additional state for each ObjectAddress that
- * it's planning to delete.  For simplicity and code-sharing we make the
- * ObjectAddresses code support arrays with or without this extra state.
- */
-typedef struct
-{
-	int			flags;			/* bitmask, see bit definitions below */
-	ObjectAddress dependee;		/* object whose deletion forced this one */
-} ObjectAddressExtra;
-
 /* ObjectAddressExtra flag bits */
 #define DEPFLAG_ORIGINAL	0x0001		/* an original deletion target */
 #define DEPFLAG_NORMAL		0x0002		/* reached via normal dependency */
@@ -97,17 +90,6 @@ typedef struct
 #define DEPFLAG_REVERSE		0x0020		/* reverse internal/extension link */
 
 
-/* expansible list of ObjectAddresses */
-struct ObjectAddresses
-{
-	ObjectAddress *refs;		/* => palloc'd array */
-	ObjectAddressExtra *extras; /* => palloc'd array, or NULL if not used */
-	int			numrefs;		/* current number of references */
-	int			maxrefs;		/* current size of palloc'd array(s) */
-};
-
-/* typedef ObjectAddresses appears in dependency.h */
-
 /* threaded list of ObjectAddresses, for recursion detection */
 typedef struct ObjectAddressStack
 {
@@ -164,12 +146,21 @@ static const Oid object_classes[] = {
 };
 
 
+/*
+ * We limit the number of dependencies reported to the client to
+ * MAX_REPORTED_DEPS, since client software may not deal well with
+ * enormous error strings.  The server log always gets a full report.
+ */
+#define MAX_REPORTED_DEPS 100
+
+
 static void findDependentObjects(const ObjectAddress *object,
 					 int flags,
 					 ObjectAddressStack *stack,
 					 ObjectAddresses *targetObjects,
 					 const ObjectAddresses *pendingObjects,
-					 Relation *depRel);
+					 Relation *depRel,
+					 bool forDrop);
 static void reportDependentObjects(const ObjectAddresses *targetObjects,
 					   DropBehavior behavior,
 					   int msglevel,
@@ -243,6 +234,230 @@ deleteObjectsInList(ObjectAddresses *targetObjects, Relation *depRel,
 	}
 }
 
+void
+transferOwnershipCascade(const ObjectAddress *object, Oid newOwnerId)
+{
+	Relation			depRel = heap_open(DependRelationId, AccessShareLock);
+	ObjectAddresses	   *targetObjects = new_object_addresses();
+	StringInfoData		clientdetail;
+	StringInfoData		logdetail;
+	int					numReportedClient = 0;
+	int					numNotReportedClient = 0;
+	char			   *objTypeDesc = getObjectTypeDescription(object);
+	int					i;
+
+
+	initStringInfo(&clientdetail);
+	initStringInfo(&logdetail);
+
+	findDependentObjects(object,
+						 DEPFLAG_ORIGINAL,
+						 NULL,	/* empty stack */
+						 targetObjects,
+						 NULL,	/* no pendingObjects */
+						 &depRel,
+						 false);
+
+	for (i = targetObjects->numrefs - 1; i >= 0; i--)
+	{
+		ObjectAddress	   *thisobj   = targetObjects->refs + i;
+		ObjectAddressExtra *thisextra = targetObjects->extras + i;
+		char			   *objDesc;
+
+
+		/* We have already changed ownership of the initial object */
+		if (thisextra->flags & DEPFLAG_ORIGINAL)
+			continue;
+
+		/* We don't have to change the owner of the column */
+		if(thisobj->objectSubId != 0)
+			continue;
+
+		objDesc = getObjectDescription(thisobj);
+
+		if (thisextra->flags & (DEPFLAG_AUTO |
+								DEPFLAG_INTERNAL |
+								DEPFLAG_EXTENSION))
+		{
+			ereport(DEBUG2,
+					(errmsg("alter %s owner auto-cascades to %s",
+							objTypeDesc,
+							objDesc)));
+		}
+		else
+		{
+			if (numReportedClient < MAX_REPORTED_DEPS)
+			{
+				/* separate entries with a newline */
+				if (clientdetail.len != 0)
+					appendStringInfoChar(&clientdetail, '\n');
+				appendStringInfo(&clientdetail, _("alter %s owner cascades to %s"),
+								 objTypeDesc,
+								 objDesc);
+				numReportedClient++;
+			}
+			else
+				numNotReportedClient++;
+			/* separate entries with a newline */
+			if (logdetail.len != 0)
+				appendStringInfoChar(&logdetail, '\n');
+			appendStringInfo(&logdetail, _("alter %s owner cascades to %s"),
+							 objTypeDesc,
+							 objDesc);
+		}
+
+		/* change ownership of a single object in a correct way */
+		transferObjectOwnership(thisobj, thisextra, newOwnerId);
+	}
+
+	if (numNotReportedClient > 0)
+		appendStringInfo(&clientdetail, ngettext("\nand %d other object "
+												 "(see server log for list)",
+												 "\nand %d other objects "
+												 "(see server log for list)",
+												 numNotReportedClient),
+						 numNotReportedClient);
+
+	if (numReportedClient > 1)
+	{
+		ereport(NOTICE,
+		/* translator: %d always has a value larger than 1 */
+				(errmsg_plural("alter %s owner cascades to %d other object",
+							   "alter %s owner cascades to %d other objects",
+							   numReportedClient + numNotReportedClient,
+							   objTypeDesc,
+							   numReportedClient + numNotReportedClient),
+				errdetail("%s", clientdetail.data),
+				errdetail_log("%s", logdetail.data)));
+	}
+	else if (numReportedClient == 1)
+	{
+		/* we just use the single item as-is */
+		ereport(NOTICE,
+				(errmsg_internal("%s", clientdetail.data)));
+	}
+
+	pfree(clientdetail.data);
+	pfree(logdetail.data);
+
+	free_object_addresses(targetObjects);
+
+	heap_close(depRel, AccessShareLock);
+}
+
+void
+transferObjectOwnership(const ObjectAddress *object,
+						const ObjectAddressExtra *objExtra,
+						Oid newOwnerId)
+{
+	switch (object->classId)
+	{
+		case DatabaseRelationId:
+			break;
+
+		case NamespaceRelationId:
+			break;
+
+		case ForeignDataWrapperRelationId:
+			AlterForeignDataWrapperOwner_oid(object->objectId, newOwnerId);
+			break;
+
+		case ForeignServerRelationId:
+			AlterForeignServerOwner_oid(object->objectId, newOwnerId);
+			break;
+
+		case EventTriggerRelationId:
+			AlterEventTriggerOwner_oid(object->objectId, newOwnerId);
+			break;
+
+		/* Relations are handled in ATExecCmd */
+		case RelationRelationId:
+			{
+				Oid			relowner;
+				Datum		relid = ObjectIdGetDatum(object->objectId);
+				HeapTuple	tuple = SearchSysCache1(RELOID, relid);
+
+				if (!HeapTupleIsValid(tuple))
+					elog(ERROR, "cache lookup failed for relation %u",
+						 object->objectId);
+				relowner = ((Form_pg_class) GETSTRUCT(tuple))->relowner;
+
+				ReleaseSysCache(tuple);
+
+				/*
+				 * We must similarly update any per-column ACLs to reflect the new
+				 * owner; for neatness reasons that's split out as a subroutine.
+				 */
+				change_owner_fix_column_acls(object->objectId,
+											 relowner,
+											 newOwnerId);
+			}
+
+		/* Also domains */
+		case TypeRelationId:
+
+		case AggregateRelationId:
+		case CollationRelationId:
+		case ConversionRelationId:
+		case ProcedureRelationId:
+		case LanguageRelationId:
+		case LargeObjectRelationId:
+		case OperatorRelationId:
+		case OperatorClassRelationId:
+		case OperatorFamilyRelationId:
+		case TableSpaceRelationId:
+		case TSDictionaryRelationId:
+		case TSConfigRelationId:
+			{
+				Oid			nsp_id		  = get_object_namespace(object);
+				int			alter_actions = AO_ACTIONS_ALL;
+				Relation	catalog;
+
+				/*
+				 * Don't check CREATE privilege on namespace
+				 * for TOAST relations.
+				 */
+				if (nsp_id == PG_TOAST_NAMESPACE)
+					alter_actions ^= AO_PERMISSION_NAMESPACE_CREATE;
+
+				/*
+				 * Don't update owner dependency reference for
+				 * INNER & EXTENSION dependency types and toast
+				 * relations.
+				 */
+				if (nsp_id == PG_TOAST_NAMESPACE ||
+						!(objExtra->flags & (DEPFLAG_AUTO | DEPFLAG_NORMAL)))
+					alter_actions ^= AO_ACTION_CHANGE_DEP_OWNER;
+
+				catalog = heap_open(object->classId, RowExclusiveLock);
+				AlterObjectOwner_custom(catalog,
+										object->objectId,
+										newOwnerId,
+										alter_actions);
+				heap_close(catalog, RowExclusiveLock);
+			}
+			break;
+
+		default:
+			elog(DEBUG2, "unknown classId %d", object->classId);
+			break;
+	}
+}
+
+void
+collectDependentObjects(const ObjectAddress* object, ObjectAddresses* targetObjects,
+						Relation* depRel)
+{
+	findDependentObjects(object,
+						 DEPFLAG_ORIGINAL,
+						 NULL,	/* empty stack */
+						 targetObjects,
+						 NULL,	/* no pendingObjects */
+						 depRel,
+						 false);
+}
+
+
 /*
  * performDeletion: attempt to drop the specified object.  If CASCADE
  * behavior is specified, also drop any dependent objects (recursively).
@@ -292,7 +507,8 @@ performDeletion(const ObjectAddress *object,
 						 NULL,	/* empty stack */
 						 targetObjects,
 						 NULL,	/* no pendingObjects */
-						 &depRel);
+						 &depRel,
+						 true);
 
 	/*
 	 * Check if deletion is allowed, and report about cascaded deletes.
@@ -363,7 +579,8 @@ performMultipleDeletions(const ObjectAddresses *objects,
 							 NULL,		/* empty stack */
 							 targetObjects,
 							 objects,
-							 &depRel);
+							 &depRel,
+							 true);
 	}
 
 	/*
@@ -430,7 +647,8 @@ deleteWhatDependsOn(const ObjectAddress *object,
 						 NULL,	/* empty stack */
 						 targetObjects,
 						 NULL,	/* no pendingObjects */
-						 &depRel);
+						 &depRel,
+						 true);
 
 	/*
 	 * Check if deletion is allowed, and report about cascaded deletes.
@@ -502,7 +720,8 @@ findDependentObjects(const ObjectAddress *object,
 					 ObjectAddressStack *stack,
 					 ObjectAddresses *targetObjects,
 					 const ObjectAddresses *pendingObjects,
-					 Relation *depRel)
+					 Relation *depRel,
+					 bool forDrop)
 {
 	ScanKeyData key[3];
 	int			nkeys;
@@ -634,15 +853,20 @@ findDependentObjects(const ObjectAddress *object,
 						otherObject.objectId == CurrentExtensionObject)
 						break;
 
-					/* No exception applies, so throw the error */
-					otherObjDesc = getObjectDescription(&otherObject);
-					ereport(ERROR,
-							(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
-							 errmsg("cannot drop %s because %s requires it",
-									getObjectDescription(object),
-									otherObjDesc),
-							 errhint("You can drop %s instead.",
-									 otherObjDesc)));
+					if (forDrop)
+					{
+						/* No exception applies, so throw the error */
+						otherObjDesc = getObjectDescription(&otherObject);
+						ereport(ERROR,
+								(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
+								errmsg("cannot drop %s because %s requires it",
+										getObjectDescription(object),
+										otherObjDesc),
+								errhint("You can drop %s instead.",
+										otherObjDesc)));
+					}
+					else
+						break;
 				}
 
 				/*
@@ -698,7 +922,8 @@ findDependentObjects(const ObjectAddress *object,
 									 stack,
 									 targetObjects,
 									 pendingObjects,
-									 depRel);
+									 depRel,
+									 forDrop);
 				/* And we're done here. */
 				systable_endscan(scan);
 				return;
@@ -800,10 +1025,19 @@ findDependentObjects(const ObjectAddress *object,
 				 * For a PIN dependency we just ereport immediately; there
 				 * won't be any others to report.
 				 */
-				ereport(ERROR,
-						(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
-						 errmsg("cannot drop %s because it is required by the database system",
-								getObjectDescription(object))));
+				if (forDrop)
+				{
+					ereport(ERROR,
+							(errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
+							errmsg("cannot drop %s because it is required by the database system",
+									getObjectDescription(object))));
+				}
+				else
+				{
+					/* We shouldn't proceed for PIN dependencies */
+					systable_endscan(scan);
+					return;
+				}
 				subflags = 0;	/* keep compiler quiet */
 				break;
 			default:
@@ -818,7 +1052,8 @@ findDependentObjects(const ObjectAddress *object,
 							 &mystack,
 							 targetObjects,
 							 pendingObjects,
-							 depRel);
+							 depRel,
+							 forDrop);
 	}
 
 	systable_endscan(scan);
@@ -876,13 +1111,6 @@ reportDependentObjects(const ObjectAddresses *targetObjects,
 		(msglevel < log_min_messages || log_min_messages == LOG))
 		return;
 
-	/*
-	 * We limit the number of dependencies reported to the client to
-	 * MAX_REPORTED_DEPS, since client software may not deal well with
-	 * enormous error strings.  The server log always gets a full report.
-	 */
-#define MAX_REPORTED_DEPS 100
-
 	initStringInfo(&clientdetail);
 	initStringInfo(&logdetail);
 
diff --git a/src/backend/catalog/pg_shdepend.c b/src/backend/catalog/pg_shdepend.c
index 65ecc45..b50870a 100644
--- a/src/backend/catalog/pg_shdepend.c
+++ b/src/backend/catalog/pg_shdepend.c
@@ -1429,7 +1429,8 @@ shdepReassignOwned(List *roleids, Oid newrole)
 
 						catalog = heap_open(classId, RowExclusiveLock);
 
-						AlterObjectOwner_internal(catalog, sdepForm->objid,
+						AlterObjectOwner_internal(catalog,
+												  sdepForm->objid,
 												  newrole);
 
 						heap_close(catalog, NoLock);
diff --git a/src/backend/commands/alter.c b/src/backend/commands/alter.c
index 5af0f2f..11f8595 100644
--- a/src/backend/commands/alter.c
+++ b/src/backend/commands/alter.c
@@ -16,6 +16,7 @@
 
 #include "access/htup_details.h"
 #include "access/sysattr.h"
+#include "access/xact.h"
 #include "catalog/dependency.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
@@ -36,6 +37,9 @@
 #include "catalog/pg_ts_dict.h"
 #include "catalog/pg_ts_parser.h"
 #include "catalog/pg_ts_template.h"
+#include "catalog/pg_database.h"
+#include "catalog/pg_depend.h"
+#include "catalog/pg_type.h"
 #include "commands/alter.h"
 #include "commands/collationcmds.h"
 #include "commands/conversioncmds.h"
@@ -51,18 +55,25 @@
 #include "commands/trigger.h"
 #include "commands/typecmds.h"
 #include "commands/user.h"
-#include "parser/parse_func.h"
 #include "miscadmin.h"
+#include "nodes/makefuncs.h"
+#include "parser/parse_func.h"
+#include "parser/parse_type.h"
+#include "postmaster/bgworker.h"
 #include "rewrite/rewriteDefine.h"
+#include "storage/dsm.h"
 #include "tcop/utility.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
 #include "utils/lsyscache.h"
 #include "utils/rel.h"
+#include "utils/resowner.h"
 #include "utils/syscache.h"
 #include "utils/tqual.h"
 
 
+static void	alterDbOwnerCascadeWorker(Datum arg);
+static ObjectAddress AlterDatabaseOwnerCascade(const char *dbname, Oid newowner);
 static Oid	AlterObjectNamespace_internal(Relation rel, Oid objid, Oid nspOid);
 
 /*
@@ -701,6 +712,190 @@ AlterObjectNamespace_internal(Relation rel, Oid objid, Oid nspOid)
 	return oldNspOid;
 }
 
+static void
+alterDbOwnerCascadeWorker(Datum arg)
+{
+	MemoryContext			oldcontext = CurrentMemoryContext;
+
+	dsm_handle				seg_handle = DatumGetUInt32(arg);
+	dsm_segment			   *seg;
+
+	volatile AlterDbDsm	   *worker_dsm;
+
+	Oid						newowner,
+							user,
+							db;
+
+
+	/*
+	 * First we have to create ResourceOwner, and only
+	 * then we are able to work with DSM.
+	 */
+	CurrentResourceOwner = ResourceOwnerCreate(NULL, "AlterDbOwnerCascadeWorker");
+
+	seg = dsm_attach(seg_handle);
+	worker_dsm = (AlterDbDsm *) dsm_segment_address(seg);
+
+	newowner = worker_dsm->newowner;
+	user	 = worker_dsm->user;
+	db		 = worker_dsm->db;
+
+
+	BackgroundWorkerUnblockSignals();
+
+	BackgroundWorkerInitializeConnectionByOid(db, user);
+
+	SetCurrentStatementStartTimestamp();
+    StartTransactionCommand();
+
+	PG_TRY();
+	{
+		AlterDatabaseOwner(get_database_name(db), newowner);
+
+		/* make changes to the database owner visible */
+		CommandCounterIncrement();
+		{
+			Relation			catalog;	/* pg_namespace */
+			Relation			depRel;		/* pg_depend */
+			HeapTuple			tuple;
+			ScanKeyData			scankey[3];
+			SysScanDesc			scan;
+
+			ObjectAddresses	   *targetObjects = new_object_addresses();
+			int					i;
+
+
+			ScanKeyInit(&scankey[0],
+						-2,
+						BTEqualStrategyNumber, F_OIDNE,
+						ObjectIdGetDatum(PG_CATALOG_NAMESPACE));
+			ScanKeyInit(&scankey[1],
+						-2,
+						BTEqualStrategyNumber, F_OIDNE,
+						ObjectIdGetDatum(PG_TOAST_NAMESPACE));
+			ScanKeyInit(&scankey[2],
+						-2,
+						BTEqualStrategyNumber, F_OIDNE,
+						ObjectIdGetDatum(PG_PUBLIC_NAMESPACE));
+
+			depRel = heap_open(DependRelationId, AccessShareLock);
+			catalog = heap_open(NamespaceRelationId, AccessShareLock);
+			scan = systable_beginscan(catalog, InvalidOid, false, NULL, 3, scankey);
+
+			while (HeapTupleIsValid(tuple = systable_getnext(scan)))
+			{
+				Form_pg_namespace	nspTup = (Form_pg_namespace) GETSTRUCT(tuple);
+				ObjectAddress		nspAddress;
+
+				ObjectAddressSet(nspAddress,
+								 NamespaceRelationId,
+								 HeapTupleGetOid(tuple));
+
+				if (!is_member_of_role(user, nspTup->nspowner) ||
+						0 == strcmp("information_schema",
+									get_namespace_name(nspAddress.objectId)))
+					continue;
+
+				AlterSchemaOwner_oid(nspAddress.objectId, newowner);
+				collectDependentObjects(&nspAddress, targetObjects, &depRel);
+			}
+
+			systable_endscan(scan);
+			heap_close(catalog, AccessShareLock);
+			heap_close(depRel, AccessShareLock);
+
+			/* make changes to the schemas' owners visible */
+			CommandCounterIncrement();
+
+			for (i = targetObjects->numrefs - 1; i >= 0; i--)
+				transferObjectOwnership(&targetObjects->refs[i],
+										&targetObjects->extras[i],
+										newowner);
+
+			free_object_addresses(targetObjects);
+		}
+
+		CommitTransactionCommand();
+
+		worker_dsm->ok = true;
+	}
+	PG_CATCH();
+	{
+		ErrorData *edata;
+
+		AbortCurrentTransaction();
+
+		/* Must reset elog.c's state */
+		MemoryContextSwitchTo(oldcontext);
+		edata = CopyErrorData();
+		FlushErrorState();
+
+		strncpy((char *) worker_dsm->message,
+				edata->message,
+				sizeof(worker_dsm->message));
+
+		worker_dsm->ok = false;
+
+		FreeErrorData(edata);
+	}
+	PG_END_TRY();
+
+	dsm_detach(seg);
+}
+
+static ObjectAddress
+AlterDatabaseOwnerCascade(const char *dbname, Oid newowner)
+{
+	ObjectAddress			address;
+
+	Oid						db_id = get_database_oid(dbname, false);
+
+	dsm_segment			   *seg = dsm_create(sizeof(AlterDbDsm), 0);
+	dsm_handle				seg_handle = dsm_segment_handle(seg);
+
+	volatile AlterDbDsm	   *worker_dsm = (AlterDbDsm *) dsm_segment_address(seg);
+
+	BgwHandleStatus			status;
+	BackgroundWorkerHandle *handle;
+	BackgroundWorker		worker;
+
+	/* initialize worker args */
+	worker_dsm->newowner = newowner;
+	worker_dsm->user	 = GetUserId();
+	worker_dsm->db		 = db_id;
+	worker_dsm->ok		 = false;
+
+	sprintf(worker.bgw_name, "AlterDatabaseOwnerCascadeWorker");
+	worker.bgw_flags		= BGWORKER_SHMEM_ACCESS |
+								BGWORKER_BACKEND_DATABASE_CONNECTION;
+	worker.bgw_start_time	= BgWorkerStart_RecoveryFinished;
+	worker.bgw_restart_time	= BGW_NEVER_RESTART;
+	worker.bgw_main			= alterDbOwnerCascadeWorker;
+	worker.bgw_main_arg		= UInt32GetDatum(seg_handle);
+	worker.bgw_notify_pid	= MyProcPid;
+
+	if (!RegisterDynamicBackgroundWorker(&worker, &handle))
+		elog(ERROR, "Could not start background worker");
+
+	status = WaitForBackgroundWorkerShutdown(handle);
+	if (status != BGWH_STOPPED)
+		elog(ERROR, "BackgroundWorker exited unexpectedly");
+
+	pfree(handle);
+
+	if (!worker_dsm->ok)
+	{
+		char *error_message = pstrdup((char *) worker_dsm->message);
+		elog(ERROR, "%s", error_message);
+	}
+	else
+		dsm_detach(seg);
+
+	ObjectAddressSet(address, DatabaseRelationId, db_id);
+
+	return address;
+}
+
 /*
  * Executes an ALTER OBJECT / OWNER TO statement.  Based on the object
  * type, the function appropriate to that type is executed.
@@ -708,32 +903,48 @@ AlterObjectNamespace_internal(Relation rel, Oid objid, Oid nspOid)
 ObjectAddress
 ExecAlterOwnerStmt(AlterOwnerStmt *stmt)
 {
-	Oid			newowner = get_rolespec_oid(stmt->newowner, false);
+	Oid newowner = get_rolespec_oid(stmt->newowner, false);
 
 	switch (stmt->objectType)
 	{
 		case OBJECT_DATABASE:
-			return AlterDatabaseOwner(strVal(linitial(stmt->object)), newowner);
+			if (stmt->cascade)
+				return AlterDatabaseOwnerCascade(strVal(linitial(stmt->object)),
+												 newowner);
+			else
+				return AlterDatabaseOwner(strVal(linitial(stmt->object)),
+										  newowner);
 
 		case OBJECT_SCHEMA:
-			return AlterSchemaOwner(strVal(linitial(stmt->object)), newowner);
+			return AlterSchemaOwner(strVal(linitial(stmt->object)), newowner,
+									stmt->cascade);
 
 		case OBJECT_TYPE:
 		case OBJECT_DOMAIN:		/* same as TYPE */
-			return AlterTypeOwner(stmt->object, newowner, stmt->objectType);
-			break;
+			/*
+			 * Both "cascade" and normal cases are coped with within
+			 * the facade AlterTypeOwner function which performs
+			 * various safety checks.
+			 */
+			return AlterTypeOwner(stmt->object,
+								  newowner,
+								  stmt->objectType,
+								  stmt->cascade);
 
 		case OBJECT_FDW:
 			return AlterForeignDataWrapperOwner(strVal(linitial(stmt->object)),
-												newowner);
+												newowner,
+												stmt->cascade);
 
 		case OBJECT_FOREIGN_SERVER:
 			return AlterForeignServerOwner(strVal(linitial(stmt->object)),
-										   newowner);
+										   newowner,
+										   stmt->cascade);
 
 		case OBJECT_EVENT_TRIGGER:
 			return AlterEventTriggerOwner(strVal(linitial(stmt->object)),
-										  newowner);
+										  newowner,
+										  stmt->cascade);
 
 			/* Generic cases */
 		case OBJECT_AGGREGATE:
@@ -749,10 +960,10 @@ ExecAlterOwnerStmt(AlterOwnerStmt *stmt)
 		case OBJECT_TSDICTIONARY:
 		case OBJECT_TSCONFIGURATION:
 			{
-				Relation	catalog;
-				Relation	relation;
-				Oid			classId;
-				ObjectAddress address;
+				ObjectAddress	address;
+				Relation		catalog;
+				Relation		relation;
+				Oid				classId;
 
 				address = get_object_address(stmt->objectType,
 											 stmt->object,
@@ -772,10 +983,14 @@ ExecAlterOwnerStmt(AlterOwnerStmt *stmt)
 					classId = LargeObjectMetadataRelationId;
 
 				catalog = heap_open(classId, RowExclusiveLock);
-
-				AlterObjectOwner_internal(catalog, address.objectId, newowner);
+				AlterObjectOwner_internal(catalog,
+										  address.objectId,
+										  newowner);
 				heap_close(catalog, RowExclusiveLock);
 
+				if (stmt->cascade)
+					transferOwnershipCascade(&address, newowner);
+
 				return address;
 			}
 			break;
@@ -787,6 +1002,12 @@ ExecAlterOwnerStmt(AlterOwnerStmt *stmt)
 	}
 }
 
+void
+AlterObjectOwner_internal(Relation rel, Oid objectId, Oid new_ownerId)
+{
+	AlterObjectOwner_custom(rel, objectId, new_ownerId, AO_ACTIONS_ALL);
+}
+
 /*
  * Generic function to change the ownership of a given object, for simple
  * cases (won't work for tables, nor other cases where we need to do more than
@@ -795,9 +1016,10 @@ ExecAlterOwnerStmt(AlterOwnerStmt *stmt)
  * rel: catalog relation containing object (RowExclusiveLock'd by caller)
  * objectId: OID of object to change the ownership of
  * new_ownerId: OID of new object owner
+ * actions: enabled checks
  */
 void
-AlterObjectOwner_internal(Relation rel, Oid objectId, Oid new_ownerId)
+AlterObjectOwner_custom(Relation rel, Oid objectId, Oid new_ownerId, int actions)
 {
 	Oid			classId = RelationGetRelid(rel);
 	AttrNumber	Anum_owner = get_object_attnum_owner(classId);
@@ -841,8 +1063,9 @@ AlterObjectOwner_internal(Relation rel, Oid objectId, Oid new_ownerId)
 		{
 			AclObjectKind aclkind = get_object_aclkind(classId);
 
-			/* must be owner */
-			if (!has_privs_of_role(GetUserId(), old_ownerId))
+			/* Must be owner */
+			if (actions & AO_PERMISSION_CURRENT_OWNER
+					&& !has_privs_of_role(GetUserId(), old_ownerId))
 			{
 				char	   *objname;
 				char		namebuf[NAMEDATALEN];
@@ -863,7 +1086,8 @@ AlterObjectOwner_internal(Relation rel, Oid objectId, Oid new_ownerId)
 				aclcheck_error(ACLCHECK_NOT_OWNER, aclkind, objname);
 			}
 			/* Must be able to become new owner */
-			check_is_member_of_role(GetUserId(), new_ownerId);
+			if (actions & AO_PERMISSION_NEW_OWNER)
+				check_is_member_of_role(GetUserId(), new_ownerId);
 
 			/* New owner must have CREATE privilege on namespace */
 			if (OidIsValid(namespaceId))
@@ -872,7 +1096,8 @@ AlterObjectOwner_internal(Relation rel, Oid objectId, Oid new_ownerId)
 
 				aclresult = pg_namespace_aclcheck(namespaceId, new_ownerId,
 												  ACL_CREATE);
-				if (aclresult != ACLCHECK_OK)
+				if (actions & AO_PERMISSION_NAMESPACE_CREATE
+						&& aclresult != ACLCHECK_OK)
 					aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
 								   get_namespace_name(namespaceId));
 			}
@@ -915,7 +1140,8 @@ AlterObjectOwner_internal(Relation rel, Oid objectId, Oid new_ownerId)
 		/* Update owner dependency reference */
 		if (classId == LargeObjectMetadataRelationId)
 			classId = LargeObjectRelationId;
-		changeDependencyOnOwner(classId, HeapTupleGetOid(newtup), new_ownerId);
+		if (actions & AO_ACTION_CHANGE_DEP_OWNER)
+			changeDependencyOnOwner(classId, HeapTupleGetOid(newtup), new_ownerId);
 
 		/* Release memory */
 		pfree(values);
diff --git a/src/backend/commands/event_trigger.c b/src/backend/commands/event_trigger.c
index 9e32f8d..dd50b01 100644
--- a/src/backend/commands/event_trigger.c
+++ b/src/backend/commands/event_trigger.c
@@ -538,7 +538,7 @@ AlterEventTrigger(AlterEventTrigStmt *stmt)
  * Change event trigger's owner -- by name
  */
 ObjectAddress
-AlterEventTriggerOwner(const char *name, Oid newOwnerId)
+AlterEventTriggerOwner(const char *name, Oid newOwnerId, bool cascade)
 {
 	Oid			evtOid;
 	HeapTuple	tup;
@@ -564,6 +564,9 @@ AlterEventTriggerOwner(const char *name, Oid newOwnerId)
 
 	heap_close(rel, RowExclusiveLock);
 
+	if (cascade)
+		transferOwnershipCascade(&address, newOwnerId);
+
 	return address;
 }
 
diff --git a/src/backend/commands/foreigncmds.c b/src/backend/commands/foreigncmds.c
index 7036eb1..c7b6bfd 100644
--- a/src/backend/commands/foreigncmds.c
+++ b/src/backend/commands/foreigncmds.c
@@ -275,7 +275,7 @@ AlterForeignDataWrapperOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerI
  * Note restrictions in the "_internal" function, above.
  */
 ObjectAddress
-AlterForeignDataWrapperOwner(const char *name, Oid newOwnerId)
+AlterForeignDataWrapperOwner(const char *name, Oid newOwnerId, bool cascade)
 {
 	Oid			fdwId;
 	HeapTuple	tup;
@@ -301,6 +301,13 @@ AlterForeignDataWrapperOwner(const char *name, Oid newOwnerId)
 
 	heap_close(rel, RowExclusiveLock);
 
+	if (cascade)
+	{
+		/* make changes to the FDW's owner visible */
+		CommandCounterIncrement();
+		transferOwnershipCascade(&address, newOwnerId);
+	}
+
 	return address;
 }
 
@@ -413,7 +420,7 @@ AlterForeignServerOwner_internal(Relation rel, HeapTuple tup, Oid newOwnerId)
  * Change foreign server owner -- by name
  */
 ObjectAddress
-AlterForeignServerOwner(const char *name, Oid newOwnerId)
+AlterForeignServerOwner(const char *name, Oid newOwnerId, bool cascade)
 {
 	Oid			servOid;
 	HeapTuple	tup;
@@ -439,6 +446,9 @@ AlterForeignServerOwner(const char *name, Oid newOwnerId)
 
 	heap_close(rel, RowExclusiveLock);
 
+	if (cascade)
+		transferOwnershipCascade(&address, newOwnerId);
+
 	return address;
 }
 
diff --git a/src/backend/commands/schemacmds.c b/src/backend/commands/schemacmds.c
index a60ceb8..28059db 100644
--- a/src/backend/commands/schemacmds.c
+++ b/src/backend/commands/schemacmds.c
@@ -301,7 +301,7 @@ AlterSchemaOwner_oid(Oid oid, Oid newOwnerId)
  * Change schema owner
  */
 ObjectAddress
-AlterSchemaOwner(const char *name, Oid newOwnerId)
+AlterSchemaOwner(const char *name, Oid newOwnerId, bool cascade)
 {
 	Oid			nspOid;
 	HeapTuple	tup;
@@ -326,6 +326,13 @@ AlterSchemaOwner(const char *name, Oid newOwnerId)
 
 	heap_close(rel, RowExclusiveLock);
 
+	if (cascade)
+	{
+		/* make changes to the schema owner visible */
+		CommandCounterIncrement();
+		transferOwnershipCascade(&address, newOwnerId);
+	}
+
 	return address;
 }
 
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 96dc923..346ef38 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -46,6 +46,7 @@
 #include "catalog/storage.h"
 #include "catalog/storage_xlog.h"
 #include "catalog/toasting.h"
+#include "commands/alter.h"
 #include "commands/cluster.h"
 #include "commands/comment.h"
 #include "commands/defrem.h"
@@ -395,12 +396,11 @@ static void RebuildConstraintComment(AlteredTableInfo *tab, int pass,
 						 Oid objid, Relation rel, char *conname);
 static void TryReuseIndex(Oid oldId, IndexStmt *stmt);
 static void TryReuseForeignKey(Oid oldId, Constraint *con);
-static void change_owner_fix_column_acls(Oid relationOid,
-							 Oid oldOwnerId, Oid newOwnerId);
 static void change_owner_recurse_to_sequences(Oid relationOid,
 								  Oid newOwnerId, LOCKMODE lockmode);
 static ObjectAddress ATExecClusterOn(Relation rel, const char *indexName,
 				LOCKMODE lockmode);
+static void ATExecChangeOwnerCascade(Relation rel, Oid newOwnerId);
 static void ATExecDropCluster(Relation rel, LOCKMODE lockmode);
 static bool ATPrepChangePersistence(Relation rel, bool toLogged);
 static void ATPrepSetTableSpace(AlteredTableInfo *tab, Relation rel,
@@ -3566,9 +3566,14 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
 												(List *) cmd->def, lockmode);
 			break;
 		case AT_ChangeOwner:	/* ALTER OWNER */
-			ATExecChangeOwner(RelationGetRelid(rel),
-							  get_rolespec_oid(cmd->newowner, false),
-							  false, lockmode);
+			if (cmd->cascade)
+				ATExecChangeOwnerCascade(rel,
+										 get_rolespec_oid(cmd->newowner,
+														  false));
+			else
+				ATExecChangeOwner(RelationGetRelid(rel),
+								  get_rolespec_oid(cmd->newowner, false),
+								  false, lockmode);
 			break;
 		case AT_ClusterOn:		/* CLUSTER ON */
 			address = ATExecClusterOn(rel, cmd->name, lockmode);
@@ -9131,13 +9136,93 @@ ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lock
 	relation_close(target_rel, NoLock);
 }
 
+static void
+ATExecChangeOwnerCascade(Relation rel, Oid newOwnerId)
+{
+	Form_pg_class	formPgClass = RelationGetForm(rel);
+	Relation		catalog; /* pg_class */
+	ObjectAddress	objAddr;
+
+	ObjectAddressSet(objAddr,
+					 RelationRelationId,
+					 RelationGetRelid(rel));
+
+	/* Can we change the ownership of this tuple? */
+	switch (formPgClass->relkind)
+	{
+		case RELKIND_RELATION:
+		case RELKIND_VIEW:
+		case RELKIND_MATVIEW:
+		case RELKIND_FOREIGN_TABLE:
+			/* ok to change owner */
+			break;
+		case RELKIND_INDEX:
+			if (formPgClass->relowner != newOwnerId)
+				ereport(WARNING,
+						(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+						errmsg("cannot change owner of index \"%s\"",
+								NameStr(formPgClass->relname)),
+						errhint("Change the ownership of the index's table, instead.")));
+			newOwnerId = formPgClass->relowner;
+			break;
+		case RELKIND_SEQUENCE:
+			if (formPgClass->relowner != newOwnerId)
+			{
+				/* if it's an owned sequence, disallow changing it by itself */
+				Oid			tableId;
+				int32		colId;
+
+				if (sequenceIsOwned(RelationGetRelid(rel), &tableId, &colId))
+					ereport(ERROR,
+							(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+							errmsg("cannot change owner of sequence \"%s\"",
+									NameStr(formPgClass->relname)),
+					errdetail("Sequence \"%s\" is linked to table \"%s\".",
+								NameStr(formPgClass->relname),
+								get_rel_name(tableId))));
+			}
+			break;
+		case RELKIND_COMPOSITE_TYPE:
+			ereport(ERROR,
+					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+					errmsg("\"%s\" is a composite type",
+							NameStr(formPgClass->relname)),
+					errhint("Use ALTER TYPE instead.")));
+			break;
+
+		default:
+			ereport(ERROR,
+					(errcode(ERRCODE_WRONG_OBJECT_TYPE),
+					errmsg("\"%s\" is not a table, view, sequence, or foreign table",
+						NameStr(formPgClass->relname))));
+	}
+
+	/* Change owner of the relation */
+	catalog = heap_open(RelationRelationId, RowExclusiveLock);
+	AlterObjectOwner_internal(catalog,
+								objAddr.objectId,
+								newOwnerId);
+	heap_close(catalog, RowExclusiveLock);
+
+	/*
+		* We must similarly update any per-column ACLs to reflect the new
+		* owner; for neatness reasons that's split out as a subroutine.
+		*/
+	change_owner_fix_column_acls(objAddr.objectId,
+									formPgClass->relowner,
+									newOwnerId);
+
+	/* Change owner of its dependencies */
+	transferOwnershipCascade(&objAddr, newOwnerId);
+}
+
 /*
  * change_owner_fix_column_acls
  *
  * Helper function for ATExecChangeOwner.  Scan the columns of the table
  * and fix any non-null column ACLs to reflect the new owner.
  */
-static void
+void
 change_owner_fix_column_acls(Oid relationOid, Oid oldOwnerId, Oid newOwnerId)
 {
 	Relation	attRelation;
diff --git a/src/backend/commands/typecmds.c b/src/backend/commands/typecmds.c
index 227d382..49ea7cd 100644
--- a/src/backend/commands/typecmds.c
+++ b/src/backend/commands/typecmds.c
@@ -51,6 +51,7 @@
 #include "catalog/pg_range.h"
 #include "catalog/pg_type.h"
 #include "catalog/pg_type_fn.h"
+#include "commands/alter.h"
 #include "commands/defrem.h"
 #include "commands/tablecmds.h"
 #include "commands/typecmds.h"
@@ -3189,7 +3190,7 @@ RenameType(RenameStmt *stmt)
  * Change the owner of a type.
  */
 ObjectAddress
-AlterTypeOwner(List *names, Oid newOwnerId, ObjectType objecttype)
+AlterTypeOwner(List *names, Oid newOwnerId, ObjectType objecttype, bool cascade)
 {
 	TypeName   *typename;
 	Oid			typeOid;
@@ -3250,36 +3251,50 @@ AlterTypeOwner(List *names, Oid newOwnerId, ObjectType objecttype)
 				 errhint("You can alter type %s, which will alter the array type as well.",
 						 format_type_be(typTup->typelem))));
 
+	ObjectAddressSet(address, TypeRelationId, typeOid);
+
 	/*
-	 * If the new owner is the same as the existing owner, consider the
-	 * command to have succeeded.  This is for dump restoration purposes.
+	 * After we've performed various checks,
+	 * it's time to either use the simple "cascade"
+	 * object ownership trasfer or change type's
+	 * owner individually.
 	 */
-	if (typTup->typowner != newOwnerId)
+	if (cascade)
 	{
-		/* Superusers can always do it */
-		if (!superuser())
+		AlterObjectOwner_internal(rel, typeOid, newOwnerId);
+		transferOwnershipCascade(&address, newOwnerId);
+	}
+	else
+	{
+		/*
+		 * If the new owner is the same as the existing owner, consider the
+		 * command to have succeeded.  This is for dump restoration purposes.
+		 */
+		if (typTup->typowner != newOwnerId)
 		{
-			/* Otherwise, must be owner of the existing object */
-			if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId()))
-				aclcheck_error_type(ACLCHECK_NOT_OWNER, HeapTupleGetOid(tup));
-
-			/* Must be able to become new owner */
-			check_is_member_of_role(GetUserId(), newOwnerId);
-
-			/* New owner must have CREATE privilege on namespace */
-			aclresult = pg_namespace_aclcheck(typTup->typnamespace,
-											  newOwnerId,
-											  ACL_CREATE);
-			if (aclresult != ACLCHECK_OK)
-				aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
-							   get_namespace_name(typTup->typnamespace));
-		}
+			/* Superusers can always do it */
+			if (!superuser())
+			{
+				/* Otherwise, must be owner of the existing object */
+				if (!pg_type_ownercheck(HeapTupleGetOid(tup), GetUserId()))
+					aclcheck_error_type(ACLCHECK_NOT_OWNER, HeapTupleGetOid(tup));
+
+				/* Must be able to become new owner */
+				check_is_member_of_role(GetUserId(), newOwnerId);
+
+				/* New owner must have CREATE privilege on namespace */
+				aclresult = pg_namespace_aclcheck(typTup->typnamespace,
+												  newOwnerId,
+												  ACL_CREATE);
+				if (aclresult != ACLCHECK_OK)
+					aclcheck_error(aclresult, ACL_KIND_NAMESPACE,
+								   get_namespace_name(typTup->typnamespace));
+			}
 
-		AlterTypeOwner_oid(typeOid, newOwnerId, true);
+			AlterTypeOwner_oid(typeOid, newOwnerId, true);
+		}
 	}
 
-	ObjectAddressSet(address, TypeRelationId, typeOid);
-
 	/* Clean up */
 	heap_close(rel, RowExclusiveLock);
 
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index a9e9cc3..520d9f7 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -2846,6 +2846,7 @@ _copyAlterTableCmd(const AlterTableCmd *from)
 	COPY_SCALAR_FIELD(subtype);
 	COPY_STRING_FIELD(name);
 	COPY_NODE_FIELD(newowner);
+	COPY_SCALAR_FIELD(cascade);
 	COPY_NODE_FIELD(def);
 	COPY_SCALAR_FIELD(behavior);
 	COPY_SCALAR_FIELD(missing_ok);
@@ -3225,6 +3226,7 @@ _copyAlterOwnerStmt(const AlterOwnerStmt *from)
 	COPY_NODE_FIELD(object);
 	COPY_NODE_FIELD(objarg);
 	COPY_NODE_FIELD(newowner);
+	COPY_SCALAR_FIELD(cascade);
 
 	return newnode;
 }
diff --git a/src/backend/nodes/equalfuncs.c b/src/backend/nodes/equalfuncs.c
index b9c3959..ce3bce4 100644
--- a/src/backend/nodes/equalfuncs.c
+++ b/src/backend/nodes/equalfuncs.c
@@ -1030,6 +1030,7 @@ _equalAlterTableCmd(const AlterTableCmd *a, const AlterTableCmd *b)
 	COMPARE_SCALAR_FIELD(subtype);
 	COMPARE_STRING_FIELD(name);
 	COMPARE_NODE_FIELD(newowner);
+	COMPARE_SCALAR_FIELD(cascade);
 	COMPARE_NODE_FIELD(def);
 	COMPARE_SCALAR_FIELD(behavior);
 	COMPARE_SCALAR_FIELD(missing_ok);
@@ -1345,6 +1346,7 @@ _equalAlterOwnerStmt(const AlterOwnerStmt *a, const AlterOwnerStmt *b)
 	COMPARE_NODE_FIELD(object);
 	COMPARE_NODE_FIELD(objarg);
 	COMPARE_NODE_FIELD(newowner);
+	COMPARE_SCALAR_FIELD(cascade);
 
 	return true;
 }
diff --git a/src/backend/parser/gram.y b/src/backend/parser/gram.y
index b307b48..709de93 100644
--- a/src/backend/parser/gram.y
+++ b/src/backend/parser/gram.y
@@ -277,6 +277,8 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 
 %type <dbehavior>	opt_drop_behavior
 
+%type <boolean>	opt_alter_owner_behavior
+
 %type <list>	createdb_opt_list createdb_opt_items copy_opt_list
 				transaction_mode_list
 				create_extension_opt_list alter_extension_opt_list
@@ -2290,11 +2292,12 @@ alter_table_cmd:
 					$$ = (Node *)n;
 				}
 			/* ALTER TABLE <name> OWNER TO RoleSpec */
-			| OWNER TO RoleSpec
+			| OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterTableCmd *n = makeNode(AlterTableCmd);
 					n->subtype = AT_ChangeOwner;
 					n->newowner = $3;
+					n->cascade = $4;
 					$$ = (Node *)n;
 				}
 			/* ALTER TABLE <name> SET TABLESPACE <tablespacename> */
@@ -2376,6 +2379,11 @@ opt_drop_behavior:
 			| RESTRICT					{ $$ = DROP_RESTRICT; }
 			| /* EMPTY */				{ $$ = DROP_RESTRICT; /* default */ }
 		;
+		
+opt_alter_owner_behavior:
+			CASCADE						{ $$ = true; }
+			| /* EMPTY */				{ $$ = false; /* default */ }
+		;
 
 opt_collate_clause:
 			COLLATE any_name
@@ -8266,159 +8274,178 @@ operator_def_elem: ColLabel '=' NONE
  *
  *****************************************************************************/
 
-AlterOwnerStmt: ALTER AGGREGATE func_name aggr_args OWNER TO RoleSpec
+AlterOwnerStmt: ALTER AGGREGATE func_name aggr_args OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_AGGREGATE;
 					n->object = $3;
 					n->objarg = extractAggrArgTypes($4);
 					n->newowner = $7;
+					n->cascade = $8;
 					$$ = (Node *)n;
 				}
-			| ALTER COLLATION any_name OWNER TO RoleSpec
+			| ALTER COLLATION any_name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_COLLATION;
 					n->object = $3;
 					n->newowner = $6;
+					n->cascade = $7;
 					$$ = (Node *)n;
 				}
-			| ALTER CONVERSION_P any_name OWNER TO RoleSpec
+			| ALTER CONVERSION_P any_name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_CONVERSION;
 					n->object = $3;
 					n->newowner = $6;
+					n->cascade = $7;
 					$$ = (Node *)n;
 				}
-			| ALTER DATABASE database_name OWNER TO RoleSpec
+			| ALTER DATABASE database_name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_DATABASE;
 					n->object = list_make1(makeString($3));
 					n->newowner = $6;
+					n->cascade = $7;
 					$$ = (Node *)n;
 				}
-			| ALTER DOMAIN_P any_name OWNER TO RoleSpec
+			| ALTER DOMAIN_P any_name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_DOMAIN;
 					n->object = $3;
 					n->newowner = $6;
+					n->cascade = $7;
 					$$ = (Node *)n;
 				}
-			| ALTER FUNCTION function_with_argtypes OWNER TO RoleSpec
+			| ALTER FUNCTION function_with_argtypes OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_FUNCTION;
 					n->object = $3->funcname;
 					n->objarg = $3->funcargs;
 					n->newowner = $6;
+					n->cascade = $7;
 					$$ = (Node *)n;
 				}
-			| ALTER opt_procedural LANGUAGE name OWNER TO RoleSpec
+			| ALTER opt_procedural LANGUAGE name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_LANGUAGE;
 					n->object = list_make1(makeString($4));
 					n->newowner = $7;
+					n->cascade = $8;
 					$$ = (Node *)n;
 				}
-			| ALTER LARGE_P OBJECT_P NumericOnly OWNER TO RoleSpec
+			| ALTER LARGE_P OBJECT_P NumericOnly OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_LARGEOBJECT;
 					n->object = list_make1($4);
 					n->newowner = $7;
+					n->cascade = $8;
 					$$ = (Node *)n;
 				}
-			| ALTER OPERATOR any_operator oper_argtypes OWNER TO RoleSpec
+			| ALTER OPERATOR any_operator oper_argtypes OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_OPERATOR;
 					n->object = $3;
 					n->objarg = $4;
 					n->newowner = $7;
+					n->cascade = $8;
 					$$ = (Node *)n;
 				}
-			| ALTER OPERATOR CLASS any_name USING access_method OWNER TO RoleSpec
+			| ALTER OPERATOR CLASS any_name USING access_method OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_OPCLASS;
 					n->object = lcons(makeString($6), $4);
 					n->newowner = $9;
+					n->cascade = $10;
 					$$ = (Node *)n;
 				}
-			| ALTER OPERATOR FAMILY any_name USING access_method OWNER TO RoleSpec
+			| ALTER OPERATOR FAMILY any_name USING access_method OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_OPFAMILY;
 					n->object = lcons(makeString($6), $4);
 					n->newowner = $9;
+					n->cascade = $10;
 					$$ = (Node *)n;
 				}
-			| ALTER SCHEMA name OWNER TO RoleSpec
+			| ALTER SCHEMA name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_SCHEMA;
 					n->object = list_make1(makeString($3));
 					n->newowner = $6;
+					n->cascade = $7;
 					$$ = (Node *)n;
 				}
-			| ALTER TYPE_P any_name OWNER TO RoleSpec
+			| ALTER TYPE_P any_name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_TYPE;
 					n->object = $3;
 					n->newowner = $6;
+					n->cascade = $7;
 					$$ = (Node *)n;
 				}
-			| ALTER TABLESPACE name OWNER TO RoleSpec
+			| ALTER TABLESPACE name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_TABLESPACE;
 					n->object = list_make1(makeString($3));
 					n->newowner = $6;
+					n->cascade = $7;
 					$$ = (Node *)n;
 				}
-			| ALTER TEXT_P SEARCH DICTIONARY any_name OWNER TO RoleSpec
+			| ALTER TEXT_P SEARCH DICTIONARY any_name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_TSDICTIONARY;
 					n->object = $5;
 					n->newowner = $8;
+					n->cascade = $9;
 					$$ = (Node *)n;
 				}
-			| ALTER TEXT_P SEARCH CONFIGURATION any_name OWNER TO RoleSpec
+			| ALTER TEXT_P SEARCH CONFIGURATION any_name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_TSCONFIGURATION;
 					n->object = $5;
 					n->newowner = $8;
+					n->cascade = $9;
 					$$ = (Node *)n;
 				}
-			| ALTER FOREIGN DATA_P WRAPPER name OWNER TO RoleSpec
+			| ALTER FOREIGN DATA_P WRAPPER name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_FDW;
 					n->object = list_make1(makeString($5));
 					n->newowner = $8;
+					n->cascade = $9;
 					$$ = (Node *)n;
 				}
-			| ALTER SERVER name OWNER TO RoleSpec
+			| ALTER SERVER name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_FOREIGN_SERVER;
 					n->object = list_make1(makeString($3));
 					n->newowner = $6;
+					n->cascade = $7;
 					$$ = (Node *)n;
 				}
-			| ALTER EVENT TRIGGER name OWNER TO RoleSpec
+			| ALTER EVENT TRIGGER name OWNER TO RoleSpec opt_alter_owner_behavior
 				{
 					AlterOwnerStmt *n = makeNode(AlterOwnerStmt);
 					n->objectType = OBJECT_EVENT_TRIGGER;
 					n->object = list_make1(makeString($4));
 					n->newowner = $7;
+					n->cascade = $8;
 					$$ = (Node *)n;
 				}
 		;
diff --git a/src/bin/psql/tab-complete.c b/src/bin/psql/tab-complete.c
index 5f27120..d0cac02 100644
--- a/src/bin/psql/tab-complete.c
+++ b/src/bin/psql/tab-complete.c
@@ -2616,6 +2616,9 @@ psql_completion(const char *text, int start, int end)
 /* OWNER TO  - complete with available roles */
 	else if (TailMatches2("OWNER", "TO"))
 		COMPLETE_WITH_QUERY(Query_for_list_of_roles);
+	
+	else if (TailMatches3("OWNER", "TO", MatchAny))
+		COMPLETE_WITH_LIST2("", "CASCADE");
 
 /* ORDER BY */
 	else if (TailMatches3("FROM", MatchAny, "ORDER"))
diff --git a/src/include/catalog/dependency.h b/src/include/catalog/dependency.h
index 049bf9f..47e00ed 100644
--- a/src/include/catalog/dependency.h
+++ b/src/include/catalog/dependency.h
@@ -15,6 +15,7 @@
 #define DEPENDENCY_H
 
 #include "catalog/objectaddress.h"
+#include "utils/hsearch.h"
 
 
 /*
@@ -112,8 +113,25 @@ typedef enum SharedDependencyType
 	SHARED_DEPENDENCY_INVALID = 0
 } SharedDependencyType;
 
-/* expansible list of ObjectAddresses (private in dependency.c) */
-typedef struct ObjectAddresses ObjectAddresses;
+/*
+ * Deletion processing requires additional state for each ObjectAddress that
+ * it's planning to delete.  For simplicity and code-sharing we make the
+ * ObjectAddresses code support arrays with or without this extra state.
+ */
+typedef struct ObjectAddressExtra
+{
+	int			flags;			/* bitmask, see bit definitions below */
+	ObjectAddress dependee;		/* object whose deletion forced this one */
+} ObjectAddressExtra;
+
+/* expansible list of ObjectAddresses */
+typedef struct ObjectAddresses
+{
+	ObjectAddress *refs;		/* => palloc'd array */
+	ObjectAddressExtra *extras; /* => palloc'd array, or NULL if not used */
+	int			numrefs;		/* current number of references */
+	int			maxrefs;		/* current size of palloc'd array(s) */
+} ObjectAddresses;
 
 /*
  * This enum covers all system catalogs whose OIDs can appear in
@@ -164,6 +182,17 @@ typedef enum ObjectClass
 #define PERFORM_DELETION_INTERNAL			0x0001
 #define PERFORM_DELETION_CONCURRENTLY		0x0002
 
+
+extern void collectDependentObjects(const ObjectAddress *object,
+									ObjectAddresses *targetObjects,
+									Relation *depRel);
+
+extern void transferObjectOwnership(const ObjectAddress *object,
+									const ObjectAddressExtra *objExtra,
+									Oid newOwnerId);
+
+extern void transferOwnershipCascade(const ObjectAddress *object, Oid newOwnerId);
+
 extern void performDeletion(const ObjectAddress *object,
 				DropBehavior behavior, int flags);
 
diff --git a/src/include/commands/alter.h b/src/include/commands/alter.h
index cf92e3e..9a268f2 100644
--- a/src/include/commands/alter.h
+++ b/src/include/commands/alter.h
@@ -19,6 +19,16 @@
 #include "nodes/parsenodes.h"
 #include "utils/relcache.h"
 
+typedef struct AlterDbDsm
+{
+	Oid		newowner;
+	Oid		user;
+	Oid		db;
+	bool	ok; /* exit status */
+	
+	char	message[1024];
+} AlterDbDsm;
+
 extern ObjectAddress ExecRenameStmt(RenameStmt *stmt);
 
 extern ObjectAddress ExecAlterObjectSchemaStmt(AlterObjectSchemaStmt *stmt,
@@ -27,7 +37,27 @@ extern Oid AlterObjectNamespace_oid(Oid classId, Oid objid, Oid nspOid,
 						 ObjectAddresses *objsMoved);
 
 extern ObjectAddress ExecAlterOwnerStmt(AlterOwnerStmt *stmt);
+
+/* Must be owner */
+#define AO_PERMISSION_CURRENT_OWNER		(1)
+/* Must be able to become new owner */
+#define AO_PERMISSION_NEW_OWNER			(1 << 1)
+/* New owner must have CREATE privilege on namespace */
+#define AO_PERMISSION_NAMESPACE_CREATE	(1 << 2)
+/* Change dependency on owner */
+#define AO_ACTION_CHANGE_DEP_OWNER		(1 << 3)
+
+#define AO_ACTIONS_ALL					\
+	(AO_PERMISSION_CURRENT_OWNER	|	\
+	 AO_PERMISSION_NEW_OWNER		|	\
+	 AO_PERMISSION_NAMESPACE_CREATE |	\
+	 AO_ACTION_CHANGE_DEP_OWNER)
+	
+
 extern void AlterObjectOwner_internal(Relation catalog, Oid objectId,
-						  Oid new_ownerId);
+									  Oid new_ownerId);
+
+extern void AlterObjectOwner_custom(Relation catalog, Oid objectId,
+									Oid new_ownerId, int actions);
 
 #endif   /* ALTER_H */
diff --git a/src/include/commands/defrem.h b/src/include/commands/defrem.h
index 54f67e9..78efb73 100644
--- a/src/include/commands/defrem.h
+++ b/src/include/commands/defrem.h
@@ -116,9 +116,11 @@ extern text *serialize_deflist(List *deflist);
 extern List *deserialize_deflist(Datum txt);
 
 /* commands/foreigncmds.c */
-extern ObjectAddress AlterForeignServerOwner(const char *name, Oid newOwnerId);
+extern ObjectAddress AlterForeignServerOwner(const char *name, Oid newOwnerId,
+											 bool cascade);
 extern void AlterForeignServerOwner_oid(Oid, Oid newOwnerId);
-extern ObjectAddress AlterForeignDataWrapperOwner(const char *name, Oid newOwnerId);
+extern ObjectAddress AlterForeignDataWrapperOwner(const char *name, Oid newOwnerId,
+												  bool cascade);
 extern void AlterForeignDataWrapperOwner_oid(Oid fwdId, Oid newOwnerId);
 extern ObjectAddress CreateForeignDataWrapper(CreateFdwStmt *stmt);
 extern ObjectAddress AlterForeignDataWrapper(AlterFdwStmt *stmt);
diff --git a/src/include/commands/event_trigger.h b/src/include/commands/event_trigger.h
index 0e91bf6..5b6f874 100644
--- a/src/include/commands/event_trigger.h
+++ b/src/include/commands/event_trigger.h
@@ -45,7 +45,8 @@ extern void RemoveEventTriggerById(Oid ctrigOid);
 extern Oid	get_event_trigger_oid(const char *trigname, bool missing_ok);
 
 extern Oid	AlterEventTrigger(AlterEventTrigStmt *stmt);
-extern ObjectAddress AlterEventTriggerOwner(const char *name, Oid newOwnerId);
+extern ObjectAddress AlterEventTriggerOwner(const char *name, Oid newOwnerId,
+											bool cascade);
 extern void AlterEventTriggerOwner_oid(Oid, Oid newOwnerId);
 
 extern bool EventTriggerSupportsObjectType(ObjectType obtype);
diff --git a/src/include/commands/schemacmds.h b/src/include/commands/schemacmds.h
index 97b53ec..c862a82 100644
--- a/src/include/commands/schemacmds.h
+++ b/src/include/commands/schemacmds.h
@@ -24,7 +24,7 @@ extern Oid CreateSchemaCommand(CreateSchemaStmt *parsetree,
 extern void RemoveSchemaById(Oid schemaOid);
 
 extern ObjectAddress RenameSchema(const char *oldname, const char *newname);
-extern ObjectAddress AlterSchemaOwner(const char *name, Oid newOwnerId);
+extern ObjectAddress AlterSchemaOwner(const char *name, Oid newOwnerId, bool cascade);
 extern void AlterSchemaOwner_oid(Oid schemaOid, Oid newOwnerId);
 
 #endif   /* SCHEMACMDS_H */
diff --git a/src/include/commands/tablecmds.h b/src/include/commands/tablecmds.h
index 7a770f4..833b311 100644
--- a/src/include/commands/tablecmds.h
+++ b/src/include/commands/tablecmds.h
@@ -35,6 +35,9 @@ extern LOCKMODE AlterTableGetLockLevel(List *cmds);
 
 extern void ATExecChangeOwner(Oid relationOid, Oid newOwnerId, bool recursing, LOCKMODE lockmode);
 
+extern void change_owner_fix_column_acls(Oid relationOid,
+							 Oid oldOwnerId, Oid newOwnerId);
+
 extern void AlterTableInternal(Oid relid, List *cmds, bool recurse);
 
 extern Oid	AlterTableMoveAll(AlterTableMoveAllStmt *stmt);
diff --git a/src/include/commands/typecmds.h b/src/include/commands/typecmds.h
index e4c86f1..89b4dbd 100644
--- a/src/include/commands/typecmds.h
+++ b/src/include/commands/typecmds.h
@@ -42,7 +42,8 @@ extern void checkDomainOwner(HeapTuple tup);
 
 extern ObjectAddress RenameType(RenameStmt *stmt);
 
-extern ObjectAddress AlterTypeOwner(List *names, Oid newOwnerId, ObjectType objecttype);
+extern ObjectAddress AlterTypeOwner(List *names, Oid newOwnerId, ObjectType objecttype,
+									bool cascade);
 extern void AlterTypeOwner_oid(Oid typeOid, Oid newOwnerId, bool hasDependEntry);
 extern void AlterTypeOwnerInternal(Oid typeOid, Oid newOwnerId);
 
diff --git a/src/include/nodes/parsenodes.h b/src/include/nodes/parsenodes.h
index 2fd0629..1a4f839 100644
--- a/src/include/nodes/parsenodes.h
+++ b/src/include/nodes/parsenodes.h
@@ -1540,6 +1540,8 @@ typedef struct AlterTableCmd	/* one subcommand of an ALTER TABLE */
 	char	   *name;			/* column, constraint, or trigger to act on,
 								 * or tablespace */
 	Node	   *newowner;		/* RoleSpec */
+	bool		cascade;
+	
 	Node	   *def;			/* definition of new column, index,
 								 * constraint, or parent table */
 	DropBehavior behavior;		/* RESTRICT or CASCADE for DROP cases */
@@ -2549,6 +2551,7 @@ typedef struct AlterOwnerStmt
 	List	   *object;			/* in case it's some other object */
 	List	   *objarg;			/* argument types, if applicable */
 	Node	   *newowner;		/* the new owner */
+	bool		cascade;
 } AlterOwnerStmt;
 
 
diff --git a/src/test/regress/expected/alter_owner_cascade.out b/src/test/regress/expected/alter_owner_cascade.out
new file mode 100644
index 0000000..009cd3d
--- /dev/null
+++ b/src/test/regress/expected/alter_owner_cascade.out
@@ -0,0 +1,209 @@
+SET client_min_messages TO 'warning';
+DROP ROLE IF EXISTS regtest_alter_casc_user1;
+DROP ROLE IF EXISTS regtest_alter_casc_user2;
+DROP ROLE IF EXISTS regtest_alter_casc_user3;
+RESET client_min_messages;
+CREATE USER regtest_alter_casc_user3;
+CREATE USER regtest_alter_casc_user2;
+CREATE USER regtest_alter_casc_user1 IN ROLE regtest_alter_casc_user3;
+CREATE SCHEMA alt_casc_nsp1;
+CREATE SCHEMA alt_casc_nsp2;
+GRANT ALL ON SCHEMA alt_casc_nsp1, alt_casc_nsp2 TO public;
+SET search_path = alt_casc_nsp1, public;
+--check tables
+SET SESSION AUTHORIZATION regtest_alter_casc_user1;
+CREATE TABLE alt_casc_nsp1.test_1(val text);
+GRANT INSERT (val) ON alt_casc_nsp1.test_1 TO regtest_alter_casc_user2;
+CREATE INDEX test_1_idx_1 ON alt_casc_nsp1.test_1(val);
+CREATE TABLE alt_casc_nsp1.test_2(val1 text, val2 int) INHERITS(alt_casc_nsp1.test_1);
+GRANT INSERT (val2) ON alt_casc_nsp1.test_2 TO regtest_alter_casc_user2;
+-- this ownership transfer should fail: index should belong to table owner
+ALTER TABLE alt_casc_nsp1.test_1_idx_1 OWNER TO regtest_alter_casc_user3 CASCADE; --fails
+WARNING:  cannot change owner of index "test_1_idx_1"
+HINT:  Change the ownership of the index's table, instead.
+ALTER TABLE alt_casc_nsp1.test_1 OWNER TO regtest_alter_casc_user3 CASCADE; --ok
+NOTICE:  alter table owner cascades to table test_2
+RESET SESSION AUTHORIZATION;
+-- check auto AND inner dependencies
+SELECT relowner::regrole FROM pg_class WHERE oid = 'alt_casc_nsp1.test_1'::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT relowner::regrole FROM pg_class WHERE oid = 'test_1_idx_1'::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_1'::regtype;
+         typowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_1[]'::regtype;
+         typowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT relowner::regrole FROM pg_class WHERE oid = ('pg_toast.pg_toast_' || 'alt_casc_nsp1.test_1'::regclass::oid)::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT relowner::regrole FROM pg_class WHERE oid = 'alt_casc_nsp1.test_2'::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_2'::regtype;
+         typowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_2[]'::regtype;
+         typowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT relowner::regrole FROM pg_class WHERE oid = ('pg_toast.pg_toast_' || 'alt_casc_nsp1.test_2'::regclass::oid)::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+-- check attribute ACLs
+SELECT attacl FROM pg_attribute WHERE attname = 'val' AND attrelid = 'alt_casc_nsp1.test_1'::regclass;
+                        attacl                         
+-------------------------------------------------------
+ {regtest_alter_casc_user2=a/regtest_alter_casc_user3}
+(1 row)
+
+SELECT attacl FROM pg_attribute WHERE attname = 'val2' AND attrelid = 'alt_casc_nsp1.test_2'::regclass;
+                        attacl                         
+-------------------------------------------------------
+ {regtest_alter_casc_user2=a/regtest_alter_casc_user3}
+(1 row)
+
+SET SESSION AUTHORIZATION regtest_alter_casc_user2;
+ALTER TABLE alt_casc_nsp1.test_1 OWNER TO regtest_alter_casc_user1 CASCADE;
+ERROR:  must be owner of relation test_1
+RESET SESSION AUTHORIZATION;
+ALTER TABLE alt_casc_nsp1.test_1 OWNER TO regtest_alter_casc_user1;
+-- check auto AND inner dependencies
+SELECT relowner::regrole FROM pg_class WHERE oid = 'alt_casc_nsp1.test_1'::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user1
+(1 row)
+
+SELECT relowner::regrole FROM pg_class WHERE oid = 'test_1_idx_1'::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user1
+(1 row)
+
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_1'::regtype;
+         typowner         
+--------------------------
+ regtest_alter_casc_user1
+(1 row)
+
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_1[]'::regtype;
+         typowner         
+--------------------------
+ regtest_alter_casc_user1
+(1 row)
+
+SELECT relowner::regrole FROM pg_class WHERE oid = ('pg_toast.pg_toast_' || 'alt_casc_nsp1.test_1'::regclass::oid)::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user1
+(1 row)
+
+SELECT relowner::regrole FROM pg_class WHERE oid = 'alt_casc_nsp1.test_2'::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_2'::regtype;
+         typowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_2[]'::regtype;
+         typowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+SELECT relowner::regrole FROM pg_class WHERE oid = ('pg_toast.pg_toast_' || 'alt_casc_nsp1.test_2'::regclass::oid)::regclass;
+         relowner         
+--------------------------
+ regtest_alter_casc_user3
+(1 row)
+
+-- check attribute ACLs again
+SELECT attacl FROM pg_attribute WHERE attname = 'val' AND attrelid = 'alt_casc_nsp1.test_1'::regclass;
+                        attacl                         
+-------------------------------------------------------
+ {regtest_alter_casc_user2=a/regtest_alter_casc_user1}
+(1 row)
+
+SELECT attacl FROM pg_attribute WHERE attname = 'val2' AND attrelid = 'alt_casc_nsp1.test_2'::regclass;
+                        attacl                         
+-------------------------------------------------------
+ {regtest_alter_casc_user2=a/regtest_alter_casc_user3}
+(1 row)
+
+--check types
+SET SESSION AUTHORIZATION regtest_alter_casc_user1;
+CREATE TYPE alt_casc_nsp1.comp_type1 AS (a real, b real);
+CREATE FUNCTION alt_casc_nsp1.comp_type_func(carg alt_casc_nsp1.comp_type1) returns text
+AS $$ begin return 'ok'; end; $$ language plpgsql;
+CREATE TABLE alt_casc_nsp1.comp_type_table(val int, cval alt_casc_nsp1.comp_type1);
+ALTER TYPE alt_casc_nsp1.comp_type_table OWNER TO regtest_alter_casc_user3 CASCADE; --fails
+ERROR:  comp_type_table is a table's row type
+HINT:  Use ALTER TABLE instead.
+ALTER DOMAIN alt_casc_nsp1.comp_type1 OWNER TO regtest_alter_casc_user3 CASCADE; --fails
+ERROR:  comp_type1 is not a domain
+ALTER TYPE alt_casc_nsp1.comp_type1 OWNER TO regtest_alter_casc_user3 CASCADE; --ok
+NOTICE:  alter type owner cascades to function comp_type_func(comp_type1)
+RESET SESSION AUTHORIZATION;
+--check schemas
+SET SESSION AUTHORIZATION regtest_alter_casc_user1;
+ALTER SCHEMA alt_casc_nsp1 OWNER TO regtest_alter_casc_user3 CASCADE; --fails
+ERROR:  must be owner of schema alt_casc_nsp1
+RESET SESSION AUTHORIZATION;
+ALTER SCHEMA alt_casc_nsp1 OWNER TO regtest_alter_casc_user1 CASCADE; --ok
+NOTICE:  alter schema owner cascades to 5 other objects
+DETAIL:  alter schema owner cascades to table test_1
+alter schema owner cascades to table test_2
+alter schema owner cascades to type comp_type1
+alter schema owner cascades to function comp_type_func(comp_type1)
+alter schema owner cascades to table comp_type_table
+ALTER TABLE alt_casc_nsp1.test_2 OWNER TO regtest_alter_casc_user2;
+SET SESSION AUTHORIZATION regtest_alter_casc_user3;
+ALTER SCHEMA alt_casc_nsp1 OWNER TO regtest_alter_casc_user1 CASCADE; --fails
+ERROR:  must be owner of relation test_2
+RESET SESSION AUTHORIZATION;
+DROP SCHEMA alt_casc_nsp1 CASCADE;
+NOTICE:  drop cascades to 5 other objects
+DETAIL:  drop cascades to table test_1
+drop cascades to table test_2
+drop cascades to type comp_type1
+drop cascades to function comp_type_func(comp_type1)
+drop cascades to table comp_type_table
+DROP SCHEMA alt_casc_nsp2 CASCADE;
+DROP USER regtest_alter_casc_user1;
+DROP USER regtest_alter_casc_user2;
+DROP USER regtest_alter_casc_user3;
diff --git a/src/test/regress/parallel_schedule b/src/test/regress/parallel_schedule
index bec0316..74855e9 100644
--- a/src/test/regress/parallel_schedule
+++ b/src/test/regress/parallel_schedule
@@ -89,7 +89,7 @@ test: brin gin gist spgist privileges security_label collate matview lock replic
 # ----------
 # Another group of parallel tests
 # ----------
-test: alter_generic alter_operator misc psql async dbsize misc_functions
+test: alter_generic alter_owner_cascade alter_operator misc psql async dbsize misc_functions
 
 # rules cannot run concurrently with any test that creates a view
 test: rules
diff --git a/src/test/regress/serial_schedule b/src/test/regress/serial_schedule
index 7e9b319..bbaa64b 100644
--- a/src/test/regress/serial_schedule
+++ b/src/test/regress/serial_schedule
@@ -114,6 +114,7 @@ test: rowsecurity
 test: object_address
 test: tablesample
 test: alter_generic
+test: alter_owner_cascade
 test: alter_operator
 test: misc
 test: psql
diff --git a/src/test/regress/sql/alter_owner_cascade.sql b/src/test/regress/sql/alter_owner_cascade.sql
new file mode 100644
index 0000000..5af21dd
--- /dev/null
+++ b/src/test/regress/sql/alter_owner_cascade.sql
@@ -0,0 +1,114 @@
+SET client_min_messages TO 'warning';
+
+DROP ROLE IF EXISTS regtest_alter_casc_user1;
+DROP ROLE IF EXISTS regtest_alter_casc_user2;
+DROP ROLE IF EXISTS regtest_alter_casc_user3;
+
+RESET client_min_messages;
+
+CREATE USER regtest_alter_casc_user3;
+CREATE USER regtest_alter_casc_user2;
+CREATE USER regtest_alter_casc_user1 IN ROLE regtest_alter_casc_user3;
+
+CREATE SCHEMA alt_casc_nsp1;
+CREATE SCHEMA alt_casc_nsp2;
+
+GRANT ALL ON SCHEMA alt_casc_nsp1, alt_casc_nsp2 TO public;
+
+SET search_path = alt_casc_nsp1, public;
+
+
+--check tables
+SET SESSION AUTHORIZATION regtest_alter_casc_user1;
+
+CREATE TABLE alt_casc_nsp1.test_1(val text);
+GRANT INSERT (val) ON alt_casc_nsp1.test_1 TO regtest_alter_casc_user2;
+CREATE INDEX test_1_idx_1 ON alt_casc_nsp1.test_1(val);
+CREATE TABLE alt_casc_nsp1.test_2(val1 text, val2 int) INHERITS(alt_casc_nsp1.test_1);
+GRANT INSERT (val2) ON alt_casc_nsp1.test_2 TO regtest_alter_casc_user2;
+
+-- this ownership transfer should fail: index should belong to table owner
+ALTER TABLE alt_casc_nsp1.test_1_idx_1 OWNER TO regtest_alter_casc_user3 CASCADE; --fails
+
+ALTER TABLE alt_casc_nsp1.test_1 OWNER TO regtest_alter_casc_user3 CASCADE; --ok
+
+RESET SESSION AUTHORIZATION;
+
+-- check auto AND inner dependencies
+SELECT relowner::regrole FROM pg_class WHERE oid = 'alt_casc_nsp1.test_1'::regclass;
+SELECT relowner::regrole FROM pg_class WHERE oid = 'test_1_idx_1'::regclass;
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_1'::regtype;
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_1[]'::regtype;
+SELECT relowner::regrole FROM pg_class WHERE oid = ('pg_toast.pg_toast_' || 'alt_casc_nsp1.test_1'::regclass::oid)::regclass;
+
+SELECT relowner::regrole FROM pg_class WHERE oid = 'alt_casc_nsp1.test_2'::regclass;
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_2'::regtype;
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_2[]'::regtype;
+SELECT relowner::regrole FROM pg_class WHERE oid = ('pg_toast.pg_toast_' || 'alt_casc_nsp1.test_2'::regclass::oid)::regclass;
+
+-- check attribute ACLs
+SELECT attacl FROM pg_attribute WHERE attname = 'val' AND attrelid = 'alt_casc_nsp1.test_1'::regclass;
+SELECT attacl FROM pg_attribute WHERE attname = 'val2' AND attrelid = 'alt_casc_nsp1.test_2'::regclass;
+
+SET SESSION AUTHORIZATION regtest_alter_casc_user2;
+ALTER TABLE alt_casc_nsp1.test_1 OWNER TO regtest_alter_casc_user1 CASCADE;
+RESET SESSION AUTHORIZATION;
+
+ALTER TABLE alt_casc_nsp1.test_1 OWNER TO regtest_alter_casc_user1;
+
+-- check auto AND inner dependencies
+SELECT relowner::regrole FROM pg_class WHERE oid = 'alt_casc_nsp1.test_1'::regclass;
+SELECT relowner::regrole FROM pg_class WHERE oid = 'test_1_idx_1'::regclass;
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_1'::regtype;
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_1[]'::regtype;
+SELECT relowner::regrole FROM pg_class WHERE oid = ('pg_toast.pg_toast_' || 'alt_casc_nsp1.test_1'::regclass::oid)::regclass;
+
+SELECT relowner::regrole FROM pg_class WHERE oid = 'alt_casc_nsp1.test_2'::regclass;
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_2'::regtype;
+SELECT typowner::regrole FROM pg_type WHERE oid = 'alt_casc_nsp1.test_2[]'::regtype;
+SELECT relowner::regrole FROM pg_class WHERE oid = ('pg_toast.pg_toast_' || 'alt_casc_nsp1.test_2'::regclass::oid)::regclass;
+
+-- check attribute ACLs again
+SELECT attacl FROM pg_attribute WHERE attname = 'val' AND attrelid = 'alt_casc_nsp1.test_1'::regclass;
+SELECT attacl FROM pg_attribute WHERE attname = 'val2' AND attrelid = 'alt_casc_nsp1.test_2'::regclass;
+
+
+--check types
+SET SESSION AUTHORIZATION regtest_alter_casc_user1;
+
+CREATE TYPE alt_casc_nsp1.comp_type1 AS (a real, b real);
+
+CREATE FUNCTION alt_casc_nsp1.comp_type_func(carg alt_casc_nsp1.comp_type1) returns text
+AS $$ begin return 'ok'; end; $$ language plpgsql;
+
+CREATE TABLE alt_casc_nsp1.comp_type_table(val int, cval alt_casc_nsp1.comp_type1);
+
+ALTER TYPE alt_casc_nsp1.comp_type_table OWNER TO regtest_alter_casc_user3 CASCADE; --fails
+
+ALTER DOMAIN alt_casc_nsp1.comp_type1 OWNER TO regtest_alter_casc_user3 CASCADE; --fails
+
+ALTER TYPE alt_casc_nsp1.comp_type1 OWNER TO regtest_alter_casc_user3 CASCADE; --ok
+
+RESET SESSION AUTHORIZATION;
+
+
+--check schemas
+SET SESSION AUTHORIZATION regtest_alter_casc_user1;
+ALTER SCHEMA alt_casc_nsp1 OWNER TO regtest_alter_casc_user3 CASCADE; --fails
+RESET SESSION AUTHORIZATION;
+
+ALTER SCHEMA alt_casc_nsp1 OWNER TO regtest_alter_casc_user1 CASCADE; --ok
+
+ALTER TABLE alt_casc_nsp1.test_2 OWNER TO regtest_alter_casc_user2;
+SET SESSION AUTHORIZATION regtest_alter_casc_user3;
+ALTER SCHEMA alt_casc_nsp1 OWNER TO regtest_alter_casc_user1 CASCADE; --fails
+RESET SESSION AUTHORIZATION;
+
+
+
+DROP SCHEMA alt_casc_nsp1 CASCADE;
+DROP SCHEMA alt_casc_nsp2 CASCADE;
+
+DROP USER regtest_alter_casc_user1;
+DROP USER regtest_alter_casc_user2;
+DROP USER regtest_alter_casc_user3;
\ No newline at end of file
-- 
Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org)
To make changes to your subscription:
http://www.postgresql.org/mailpref/pgsql-hackers

Reply via email to