On Sun, Nov 2, 2014 at 7:31 AM, Simon Riggs <si...@2ndquadrant.com> wrote: > The procgloballist stuff should be the subject of a separate patch > which I agree with.
Yes, I think that's probably a net improvement in robustness quite apart from what we decide to do about any of the rest of this. I've attached it here as revise-procglobal-tracking.patch and will commit that bit if nobody objects. The remainder is reattached without change as group-locking-v0.1.patch. Per your other comment, I've developed the beginnings of a testing framework which I attached here as test_group_locking-v0.patch. That doesn't look to have much hope of evolving into something we'd want even in contrib, but I think it'll be rather useful for debugging. It works like this: rhaas=# create table foo (a int); CREATE TABLE rhaas=# select test_group_locking('1.0:start,2.0:start,1.0:lock:AccessExclusiveLock:foo,2.0:lock:AccessExclusiveLock:foo'); NOTICE: starting worker 1.0 NOTICE: starting worker 2.0 NOTICE: instructing worker 1.0 to acquire AccessExclusiveLock on relation with OID 16387 NOTICE: instructing worker 2.0 to acquire AccessExclusiveLock on relation with OID 16387 ERROR: could not obtain AccessExclusiveLock on relation with OID 16387 CONTEXT: background worker, group 2, task 0 The syntax is a little arcane, I guess, but it's "documented" in the comments within. In this case I asked it to start up two background workers and have them both try to take AccessExclusiveLock on table foo. As expected, the second one fails. The idea is that workers are identified by a pair of numbers X.Y; two workers with the same X-value are in the same locking group. So if I call the second worker 1.1 rather than 2.0, it'll join the same locking group as worker 1.0 and ... then it does the wrong thing, and then it crashes the server, because my completely-untested code is unsurprisingly riddled with bugs. Eventually, this needs to be generalized a bit so that we can use it to test deadlock detection. That's tricky, because what you really want to do is tell worker A to wait for some lock and then, once you're sure it's on the wait queue, tell worker B to go take some other lock and check that you see the resulting deadlock. There doesn't seem to be a good API for the user backend to find out whether some background worker is waiting for some particular lock, so I may have to resort to the hacky expedient of having the driver process wait for a few seconds and assume that's long enough that the background worker will be on the wait queue by then. Or maybe I can drum up some solution, but anyway it's not done yet. The value of this test code is that we can easily reproduce locking scenarios which would be hard to reproduce in a real workload - e.g. because they're timing-dependent. -- Robert Haas EnterpriseDB: http://www.enterprisedb.com The Enterprise PostgreSQL Company
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 723051e..85997f6 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -210,6 +210,10 @@ static bool FastPathUnGrantRelationLock(Oid relid, LOCKMODE lockmode); static bool FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag, uint32 hashcode); static PROCLOCK *FastPathGetRelationLockEntry(LOCALLOCK *locallock); +static bool GroupLockShouldJumpQueue(LockMethod lockMethodTable, + LOCKMODE lockmode, + LOCK *lock, + PROCLOCK *proclock); /* * To make the fast-path lock mechanism work, we must have some way of @@ -339,7 +343,8 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) static uint32 proclock_hash(const void *key, Size keysize); static void RemoveLocalLock(LOCALLOCK *locallock); static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, - const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode); + PGPROC *leader, const LOCKTAG *locktag, uint32 hashcode, + LOCKMODE lockmode); static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode); static void FinishStrongLockAcquire(void); @@ -894,8 +899,8 @@ LockAcquireExtended(const LOCKTAG *locktag, * away anytime. So we have to use SetupLockInTable() to recompute the * lock and proclock pointers, even if they're already set. */ - proclock = SetupLockInTable(lockMethodTable, MyProc, locktag, - hashcode, lockmode); + proclock = SetupLockInTable(lockMethodTable, MyProc, LockGroupLeader, + locktag, hashcode, lockmode); if (!proclock) { AbortStrongLockAcquire(); @@ -914,18 +919,27 @@ LockAcquireExtended(const LOCKTAG *locktag, /* * If lock requested conflicts with locks requested by waiters, must join - * wait queue. Otherwise, check for conflict with already-held locks. - * (That's last because most complex check.) + * wait queue (except for certain cases involving group locking, where + * new lockers must sometimes jump the entire wait queue to avoid + * deadlock). Otherwise, we can grant ourselves the lock if there are + * no conflicts. */ if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) - status = STATUS_FOUND; + { + if (proclock->groupLeader != NULL && + GroupLockShouldJumpQueue(lockMethodTable, lockmode, lock, + proclock)) + status = STATUS_OK; + else + status = STATUS_FOUND; + } else status = LockCheckConflicts(lockMethodTable, lockmode, lock, proclock); if (status == STATUS_OK) { - /* No conflict with held or previously requested locks */ + /* We can and should grant ourselves the lock at once */ GrantLock(lock, proclock, lockmode); GrantLockLocal(locallock, owner); } @@ -1053,7 +1067,7 @@ LockAcquireExtended(const LOCKTAG *locktag, * held at exit. */ static PROCLOCK * -SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, +SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, PGPROC *leader, const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode) { LOCK *lock; @@ -1141,6 +1155,7 @@ SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, { uint32 partition = LockHashPartition(hashcode); + proclock->groupLeader = leader; proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ @@ -1258,9 +1273,10 @@ RemoveLocalLock(LOCALLOCK *locallock) * NOTES: * Here's what makes this complicated: one process's locks don't * conflict with one another, no matter what purpose they are held for - * (eg, session and transaction locks do not conflict). - * So, we must subtract off our own locks when determining whether the - * requested new lock conflicts with those already held. + * (eg, session and transaction locks do not conflict). Nor do the locks + * of one process in a lock group conflict with those of another process in + * the same group. So, we must subtract off these locks when determining + * whether the requested new lock conflicts with those already held. */ int LockCheckConflicts(LockMethod lockMethodTable, @@ -1270,8 +1286,12 @@ LockCheckConflicts(LockMethod lockMethodTable, { int numLockModes = lockMethodTable->numLockModes; LOCKMASK myLocks; - LOCKMASK otherLocks; + int conflictMask = lockMethodTable->conflictTab[lockmode]; + int conflictsRemaining[MAX_LOCKMODES]; + int totalConflictsRemaining = 0; int i; + SHM_QUEUE *procLocks; + PROCLOCK *otherproclock; /* * first check for global conflicts: If no locks conflict with my request, @@ -1282,44 +1302,215 @@ LockCheckConflicts(LockMethod lockMethodTable, * type of lock that conflicts with request. Bitwise compare tells if * there is a conflict. */ - if (!(lockMethodTable->conflictTab[lockmode] & lock->grantMask)) + if (!(conflictMask & lock->grantMask)) { PROCLOCK_PRINT("LockCheckConflicts: no conflict", proclock); return STATUS_OK; } /* - * Rats. Something conflicts. But it could still be my own lock. We have - * to construct a conflict mask that does not reflect our own locks, but - * only lock types held by other processes. + * Rats. Something conflicts. But it could still be my own lock, or + * a lock held by another member of my locking group. First, figure out + * how many conflicts remain after subtracting out any locks I hold + * myself. */ myLocks = proclock->holdMask; - otherLocks = 0; for (i = 1; i <= numLockModes; i++) { - int myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0; + if ((conflictMask & LOCKBIT_ON(i)) == 0) + { + conflictsRemaining[i] = 0; + continue; + } + conflictsRemaining[i] = lock->granted[i]; + if (myLocks & LOCKBIT_ON(i)) + --conflictsRemaining[i]; + totalConflictsRemaining += conflictsRemaining[i]; + } - if (lock->granted[i] > myHolding) - otherLocks |= LOCKBIT_ON(i); + /* If no conflicts remain, we get the lock. */ + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("LockCheckConflicts: resolved (simple)", proclock); + return STATUS_OK; + } + + /* If we're not using group locking, this is definitely a conflict. */ + if (proclock->groupLeader == NULL) + { + PROCLOCK_PRINT("LockCheckConflicts: conflicting (simple)", proclock); + return STATUS_FOUND; + } + + /* Important special case: we're the only member of a lock group. */ + if (proclock->groupLeader == MyProc && MyProc->lockGroupMembers < 2) + { + Assert(proclock->tag.myProc == MyProc); + Assert(MyProc->lockGroupMembers == 1); + PROCLOCK_PRINT("LockCheckConflicts: conflicting (trivial group)", + proclock); + return STATUS_FOUND; } /* - * now check again for conflicts. 'otherLocks' describes the types of - * locks held by other processes. If one of these conflicts with the kind - * of lock that I want, there is a conflict and I have to sleep. + * Locks held in conflicting modes by members of our own lock group are + * not real conflicts; we can subtract those out and see if we still have + * a conflict. This is O(N) in the number of processes holding or awaiting + * locks on this object. We could improve that by making the shared memory + * state more complex (and larger) but it doesn't seem worth it. */ - if (!(lockMethodTable->conflictTab[lockmode] & otherLocks)) + procLocks = &(lock->procLocks); + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); + while (otherproclock != NULL) { - /* no conflict. OK to get the lock */ - PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock); - return STATUS_OK; + if (proclock != otherproclock && + proclock->groupLeader == otherproclock->groupLeader && + (otherproclock->holdMask & conflictMask) != 0) + { + int intersectMask = otherproclock->holdMask & conflictMask; + + for (i = 1; i <= numLockModes; i++) + { + if ((intersectMask & LOCKBIT_ON(i)) != 0) + { + if (conflictsRemaining[i] <= 0) + elog(PANIC, "proclocks held do not match lock"); + conflictsRemaining[i]--; + totalConflictsRemaining--; + } + } + + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("LockCheckConflicts: resolved (group)", + proclock); + return STATUS_OK; + } + } + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, &proclock->lockLink, + offsetof(PROCLOCK, lockLink)); } - PROCLOCK_PRINT("LockCheckConflicts: conflicting", proclock); + /* Nope, it's a real conflict. */ + PROCLOCK_PRINT("LockCheckConflicts: conflicting (group)", proclock); return STATUS_FOUND; } /* + * GroupLockGrantWithoutWait -- should a group lock be granted without + * waiting, despite the presence of conflicting waiters? + * + * If some member of our locking group already holds a lock on the object, + * then we should skip the wait queue and grant ourselves the lock immediately. + * This is because we presume lock group members will eventually wait for + * each other; thus, if we didn't do this, such situations would result in + * an eventual deadlock. However, if a conflicting lock is present that is + * not held by another member of our lock group, then we can't do this. + * In that case we'll have to wait despite the deadlock risk and hope for + * the best. + */ +static bool +GroupLockShouldJumpQueue(LockMethod lockMethodTable, + LOCKMODE lockmode, + LOCK *lock, + PROCLOCK *proclock) +{ + int numLockModes = lockMethodTable->numLockModes; + LOCKMASK myLocks; + int conflictMask = lockMethodTable->conflictTab[lockmode]; + int conflictsRemaining[MAX_LOCKMODES]; + int totalConflictsRemaining = 0; + int i; + SHM_QUEUE *procLocks; + PROCLOCK *otherproclock; + + /* + * If we're the only member of the lock group, then clearly no other + * member holds a lock. We should NOT jump the queue. + */ + if (proclock->groupLeader == MyProc && MyProc->lockGroupMembers < 2) + { + Assert(proclock->tag.myProc == MyProc); + Assert(MyProc->lockGroupMembers == 1); + PROCLOCK_PRINT("GroupLockShouldJumpQueue: trivial group", proclock); + return false; + } + + /* Count the number of lock conflicts, excluding my own locks. */ + myLocks = proclock->holdMask; + for (i = 1; i <= numLockModes; i++) + { + if ((conflictMask & LOCKBIT_ON(i)) == 0) + { + conflictsRemaining[i] = 0; + continue; + } + conflictsRemaining[i] = lock->granted[i]; + if (myLocks & LOCKBIT_ON(i)) + --conflictsRemaining[i]; + totalConflictsRemaining += conflictsRemaining[i]; + } + + /* + * Search for locks held by other group members. Even if there are + * no conflicts, we can't exit early yet, because we don't know whether + * any group member actually holds a lock. + */ + procLocks = &(lock->procLocks); + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, lockLink)); + while (otherproclock != NULL) + { + if (proclock != otherproclock && + proclock->groupLeader == otherproclock->groupLeader && + otherproclock->holdMask != 0) + { + int intersectMask = otherproclock->holdMask & conflictMask; + + /* + * Does the group member hold a lock in 1 or more conflicting + * modes? If so, reduce the count of remaining conflicts by the + * number of such modes. + */ + if (intersectMask != 0) + { + for (i = 1; i <= numLockModes; i++) + { + if ((intersectMask & LOCKBIT_ON(i)) != 0) + { + if (conflictsRemaining[i] <= 0) + elog(PANIC, "proclocks held do not match lock"); + conflictsRemaining[i]--; + totalConflictsRemaining--; + } + } + } + + /* + * Whether there were any conflicting modes here or not, the fact + * that the lock is held at all makes us eligible to jump the + * queue. But we can only do that once the absence of conflicts + * is established. + */ + if (totalConflictsRemaining == 0) + { + PROCLOCK_PRINT("GroupLockShouldJumpQueue: jump", proclock); + return true; + } + } + otherproclock = (PROCLOCK *) + SHMQueueNext(procLocks, &proclock->lockLink, + offsetof(PROCLOCK, lockLink)); + } + + /* Either no group members hold locks, or there are conflicts. */ + PROCLOCK_PRINT("GroupLockShouldJumpQueue: fallthrough", proclock); + return false; +} + +/* * GrantLock -- update the lock and proclock data structures to show * the lock request has been granted. * @@ -2534,7 +2725,8 @@ FastPathTransferRelationLocks(LockMethod lockMethodTable, const LOCKTAG *locktag if (!FAST_PATH_CHECK_LOCKMODE(proc, f, lockmode)) continue; - proclock = SetupLockInTable(lockMethodTable, proc, locktag, + proclock = SetupLockInTable(lockMethodTable, proc, + proc->lockGroupLeader, locktag, hashcode, lockmode); if (!proclock) { @@ -2590,8 +2782,8 @@ FastPathGetRelationLockEntry(LOCALLOCK *locallock) /* Find or create lock object. */ LWLockAcquire(partitionLock, LW_EXCLUSIVE); - proclock = SetupLockInTable(lockMethodTable, MyProc, locktag, - locallock->hashcode, lockmode); + proclock = SetupLockInTable(lockMethodTable, MyProc, LockGroupLeader, + locktag, locallock->hashcode, lockmode); if (!proclock) { LWLockRelease(partitionLock); @@ -3094,6 +3286,9 @@ PostPrepare_Locks(TransactionId xid) PROCLOCKTAG proclocktag; int partition; + /* Can't prepare a lock group follower. */ + Assert(LockGroupLeader == MyProc || LockGroupLeader == NULL); + /* This is a critical section: any error means big trouble */ START_CRIT_SECTION(); @@ -3195,6 +3390,19 @@ PostPrepare_Locks(TransactionId xid) Assert(proclock->tag.myProc == MyProc); + /* + * We shouldn't be in a lock group, except for a single-entry + * group for which we're the leader, which is OK. We need to + * clear the groupLeader pointer in that case, so that the dummy + * PGPROC doesn't end up pointing back to our PGPROC. + */ + if (proclock->groupLeader != NULL) + { + Assert(proclock->groupLeader == MyProc); + Assert(MyProc->lockGroupMembers == 1); + proclock->groupLeader = NULL; + } + lock = proclock->tag.myLock; /* Ignore VXID locks */ @@ -3784,6 +3992,7 @@ lock_twophase_recover(TransactionId xid, uint16 info, */ if (!found) { + proclock->groupLeader = NULL; proclock->holdMask = 0; proclock->releaseMask = 0; /* Add proclock to appropriate lists */ @@ -4058,7 +4267,8 @@ VirtualXactLock(VirtualTransactionId vxid, bool wait) LWLockAcquire(partitionLock, LW_EXCLUSIVE); proclock = SetupLockInTable(LockMethods[DEFAULT_LOCKMETHOD], proc, - &tag, hashcode, ExclusiveLock); + proc->lockGroupLeader, &tag, hashcode, + ExclusiveLock); if (!proclock) { LWLockRelease(partitionLock); diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index de7608a..1af9851 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -64,6 +64,13 @@ PGPROC *MyProc = NULL; PGXACT *MyPgXact = NULL; /* + * If we're not in a lock group, LockGroupLeader will be NULL. Otherwise, + * it should be the set to the leader of the lock group of which we're a + * member. This will be the same as MyProc iff we're the group leader. + */ +PGPROC *LockGroupLeader = NULL; + +/* * This spinlock protects the freelist of recycled PGPROC structures. * We cannot use an LWLock because the LWLock manager depends on already * having a PGPROC and a wait semaphore! But these structures are touched @@ -383,6 +390,7 @@ InitProcess(void) MyProc->lwWaitLink = NULL; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + Assert(MyProc->lockGroupLeader == NULL); #ifdef USE_ASSERT_CHECKING { int i; @@ -546,6 +554,7 @@ InitAuxiliaryProcess(void) MyProc->lwWaitLink = NULL; MyProc->waitLock = NULL; MyProc->waitProcLock = NULL; + Assert(MyProc->lockGroupLeader == NULL); #ifdef USE_ASSERT_CHECKING { int i; @@ -782,23 +791,12 @@ ProcKill(int code, Datum arg) /* use volatile pointer to prevent code rearrangement */ volatile PROC_HDR *procglobal = ProcGlobal; PGPROC *proc; - PGPROC * volatile * procgloballist; Assert(MyProc != NULL); /* Make sure we're out of the sync rep lists */ SyncRepCleanupAtProcExit(); -#ifdef USE_ASSERT_CHECKING - { - int i; - - /* Last process should have released all locks. */ - for (i = 0; i < NUM_LOCK_PARTITIONS; i++) - Assert(SHMQueueEmpty(&(MyProc->myProcLocks[i]))); - } -#endif - /* * Release any LW locks I am holding. There really shouldn't be any, but * it's cheap to check again before we cut the knees off the LWLock @@ -810,6 +808,50 @@ ProcKill(int code, Datum arg) if (MyReplicationSlot != NULL) ReplicationSlotRelease(); + /* If we're a lock group member, detach from the lock group. */ + if (LockGroupLeader != NULL && LockGroupLeader != MyProc) + { + int members; + + LWLockAcquire(LockGroupLeader->backendLock, LW_EXCLUSIVE); + members = --LockGroupLeader->lockGroupMembers; + LWLockRelease(LockGroupLeader->backendLock); + + LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE); + MyProc->lockGroupLeader = NULL; + LWLockRelease(MyProc->backendLock); + + /* If we're the last member of the lock group, detach the PGPROC. */ + if (members == 0) + { + PGPROC * volatile * procgloballist; + +#ifdef USE_ASSERT_CHECKING + { + int i; + + /* Last process should have released all locks. */ + for (i = 0; i < NUM_LOCK_PARTITIONS; i++) + Assert(SHMQueueEmpty(&(LockGroupLeader->myProcLocks[i]))); + } +#endif + + procgloballist = LockGroupLeader->procgloballist; + + SpinLockAcquire(ProcStructLock); + + /* Return PGPROC structure to appropriate freelist */ + LockGroupLeader->links.next = (SHM_QUEUE *) *procgloballist; + *procgloballist = LockGroupLeader; + + /* Update shared estimate of spins_per_delay */ + procglobal->spins_per_delay = + update_spins_per_delay(procglobal->spins_per_delay); + + SpinLockRelease(ProcStructLock); + } + } + /* * Clear MyProc first; then disown the process latch. This is so that * signal handlers won't try to clear the process latch after it's no @@ -819,17 +861,55 @@ ProcKill(int code, Datum arg) MyProc = NULL; DisownLatch(&proc->procLatch); - procgloballist = proc->procgloballist; - SpinLockAcquire(ProcStructLock); + /* + * If we are a lock group leader, we need to check whether any other + * group members are active. If not, we can declare ourselves to no longer + * be a lock group leader, allowing our PGPROC to be recycled + * immediately. + */ + if (LockGroupLeader == proc) + { + int members; - /* Return PGPROC structure (and semaphore) to appropriate freelist */ - proc->links.next = (SHM_QUEUE *) *procgloballist; - *procgloballist = proc; + LWLockAcquire(proc->backendLock, LW_EXCLUSIVE); + members = --proc->lockGroupMembers; + LWLockRelease(proc->backendLock); - /* Update shared estimate of spins_per_delay */ - procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay); + LockGroupLeader = NULL; + } - SpinLockRelease(ProcStructLock); + /* + * If we were never a lock group leader or have managed to give up that + * designation, then we can immediately release our PGPROC. If not, then + * the last group member will do that on exit. + */ + if (LockGroupLeader == NULL) + { + PGPROC * volatile * procgloballist; + +#ifdef USE_ASSERT_CHECKING + { + int i; + + /* Last process should have released all locks. */ + for (i = 0; i < NUM_LOCK_PARTITIONS; i++) + Assert(SHMQueueEmpty(&(proc->myProcLocks[i]))); + } +#endif + + procgloballist = proc->procgloballist; + SpinLockAcquire(ProcStructLock); + + /* Return PGPROC structure (and semaphore) to appropriate freelist */ + proc->links.next = (SHM_QUEUE *) *procgloballist; + *procgloballist = proc; + + /* Update shared estimate of spins_per_delay */ + procglobal->spins_per_delay = + update_spins_per_delay(procglobal->spins_per_delay); + + SpinLockRelease(ProcStructLock); + } /* * This process is no longer present in shared memory in any meaningful @@ -957,18 +1037,53 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) bool early_deadlock = false; bool allow_autovacuum_cancel = true; int myWaitStatus; - PGPROC *proc; + PGPROC *proc = NULL; int i; + PGPROC *groupLeader = LockGroupLeader; + + /* + * Ignore trivial lock groups. + * + * We read MyProc->lockGroupMembers here without a lock. The read itself + * is atomic; while the value could be changing under us, it can't change + * from a value < 2 to a value >= 2 while any group locks are actually + * present. Similarly, when iterating over the wait queue, we needn't + * worry that the lock group membership of a process will change under us: + * that's not allowed while a process holds any locks. + */ + if (MyProc == groupLeader && MyProc->lockGroupMembers >= 2) + groupLeader = NULL; /* * Determine where to add myself in the wait queue. * - * Normally I should go at the end of the queue. However, if I already - * hold locks that conflict with the request of any previous waiter, put - * myself in the queue just in front of the first such waiter. This is not - * a necessary step, since deadlock detection would move me to before that - * waiter anyway; but it's relatively cheap to detect such a conflict - * immediately, and avoid delaying till deadlock timeout. + * Normally I should go at the end of the queue. However, if I'm a + * member of a lock group, and some other member of the lock group is + * already waiting for a lock, then add add myself just after the + * existing waiters. This is necessary for correctness; any code that + * scans the wait queue is entitled to assume that lockers from the same + * group are in consecutive positions in the queue. + */ + if (groupLeader != NULL) + { + PGPROC *cproc = (PGPROC *) waitQueue->links.next; + + for (i = 0; i < waitQueue->size; i++) + { + if (cproc->lockGroupLeader == LockGroupLeader) + proc = cproc; + else if (proc != NULL) + break; + cproc = (PGPROC *) cproc->links.next; + } + } + + /* + * If I already hold locks that conflict with the request of any previous + * waiter, put myself in the queue just in front of the first such waiter. + * This is not a necessary step, since deadlock detection would move me + * to before that waiter anyway; but it's relatively cheap to detect such + * a conflict immediately, and avoid delaying till deadlock timeout. * * Special case: if I find I should go in front of some waiter, check to * see if I conflict with already-held locks or the requests before that @@ -979,16 +1094,24 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) */ if (myHeldLocks != 0) { + PGPROC *cproc = (PGPROC *) waitQueue->links.next; LOCKMASK aheadRequests = 0; - proc = (PGPROC *) waitQueue->links.next; for (i = 0; i < waitQueue->size; i++) { + /* + * If we reached our own lock group in the wait queue without + * finding a conflict, we aren't going to find one at all prior + * to the insertion point, so bail out. + */ + if (groupLeader != NULL && cproc->lockGroupLeader == groupLeader) + break; + /* Must he wait for me? */ - if (lockMethodTable->conflictTab[proc->waitLockMode] & myHeldLocks) + if (lockMethodTable->conflictTab[cproc->waitLockMode] & myHeldLocks) { /* Must I wait for him ? */ - if (lockMethodTable->conflictTab[lockmode] & proc->heldLocks) + if (lockMethodTable->conflictTab[lockmode] & cproc->heldLocks) { /* * Yes, so we have a deadlock. Easiest way to clean up @@ -997,7 +1120,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * a flag to check below, and break out of loop. Also, * record deadlock info for later message. */ - RememberSimpleDeadLock(MyProc, lockmode, lock, proc); + RememberSimpleDeadLock(MyProc, lockmode, lock, cproc); early_deadlock = true; break; } @@ -1013,22 +1136,19 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) GrantAwaitedLock(); return STATUS_OK; } - /* Break out of loop to put myself before him */ + /* Break out of loop and put myself before him */ + proc = cproc; break; } /* Nope, so advance to next waiter */ - aheadRequests |= LOCKBIT_ON(proc->waitLockMode); - proc = (PGPROC *) proc->links.next; + aheadRequests |= LOCKBIT_ON(cproc->waitLockMode); + cproc = (PGPROC *) cproc->links.next; } - - /* - * If we fall out of loop normally, proc points to waitQueue head, so - * we will insert at tail of queue as desired. - */ } - else + + if (proc == NULL) { - /* I hold no locks, so I can't push in front of anyone. */ + /* No special case applies, so I can't push in front of anyone. */ proc = (PGPROC *) &(waitQueue->links); } @@ -1611,6 +1731,71 @@ check_done: LWLockRelease(LockHashPartitionLockByIndex(i)); } +/* + * BecomeLockGroupLeader - designate process as lock group leader + * + * Once this function has returned, other processes can join the lock group + * by calling BecomLockGroupFollower. + */ +void +BecomeLockGroupLeader(void) +{ + /* Can't be leader and follower. */ + Assert(LockGroupLeader == NULL || LockGroupLeader == MyProc); + + /* This can be called more than once; but we must not redo the work. */ + if (LockGroupLeader == NULL) + { + LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE); + Assert(MyProc->lockGroupMembers == 0); + Assert(MyProc->lockGroupLeader == NULL); + MyProc->lockGroupLeader = MyProc; + MyProc->lockGroupLeaderIdentifier = MyProcPid; + MyProc->lockGroupMembers = 1; + LWLockRelease(MyProc->backendLock); + } +} + +/* + * BecomeLockGroupFollower - designate process as lock group follower + * + * This is pretty straightforward except for the possibility that the leader + * whose group we're trying to join might exit before we manage to do so; + * and the PGPROC might get recycled for an unrelated process. To avoid + * that, we require the caller to pass the PID of the intended PGPROC as + * an interlock. Returns true if we successfully join the intended lock + * group, and false if not. + */ +bool +BecomeLockGroupFollower(PGPROC *leader, int pid) +{ + bool ok = false; + + /* Can't become a follower if we already in a lock group. */ + Assert(LockGroupLeader == NULL); + + /* Can't follow ourselves. */ + Assert(MyProc != leader); + + /* Try to join the group. */ + LWLockAcquire(leader->backendLock, LW_EXCLUSIVE); + if (leader->lockGroupMembers > 0 && + leader->lockGroupLeaderIdentifier == pid) + { + ok = true; + leader->lockGroupMembers++; + LockGroupLeader = leader; + } + LWLockRelease(leader->backendLock); + + /* Advertise our new leader. */ + LWLockAcquire(MyProc->backendLock, LW_EXCLUSIVE); + MyProc->lockGroupLeader = leader; + LWLockRelease(MyProc->backendLock); + + return ok; +} + /* * ProcWaitForSignal - wait for a signal from another backend. diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 4c49e3c..b15addb 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -362,6 +362,7 @@ typedef struct PROCLOCK PROCLOCKTAG tag; /* unique identifier of proclock object */ /* data */ + PGPROC *groupLeader; /* group leader, or NULL if no lock group */ LOCKMASK holdMask; /* bitmask for lock types currently held */ LOCKMASK releaseMask; /* bitmask for lock types to be released */ SHM_QUEUE lockLink; /* list link in LOCK's list of proclocks */ @@ -473,7 +474,6 @@ typedef enum * worker */ } DeadLockState; - /* * The lockmgr's shared hash tables are partitioned to reduce contention. * To determine which partition a given locktag belongs to, compute the tag's diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index f2fec32..6f842f7 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -143,6 +143,11 @@ struct PGPROC bool fpVXIDLock; /* are we holding a fast-path VXID lock? */ LocalTransactionId fpLocalTransactionId; /* lxid for fast-path VXID * lock */ + + /* Support for lock groups. */ + int lockGroupMembers; /* 0 if not a lock group leader */ + int lockGroupLeaderIdentifier; /* MyProcPid, if I'm a leader */ + PGPROC *lockGroupLeader; /* lock group leader, if I'm a follower */ }; /* NOTE: "typedef struct PGPROC PGPROC" appears in storage/lock.h. */ @@ -150,6 +155,7 @@ struct PGPROC extern PGDLLIMPORT PGPROC *MyProc; extern PGDLLIMPORT struct PGXACT *MyPgXact; +extern PGDLLIMPORT PGPROC *LockGroupLeader; /* * Prior to PostgreSQL 9.2, the fields below were stored as part of the @@ -254,6 +260,8 @@ extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock); extern void CheckDeadLock(void); extern bool IsWaitingForLock(void); extern void LockErrorCleanup(void); +extern void BecomeLockGroupLeader(void); +extern bool BecomeLockGroupFollower(PGPROC *leader, int pid); extern void ProcWaitForSignal(void); extern void ProcSendSignal(int pid); -- 1.7.9.6 (Apple Git-31.1)
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index ea88a24..de7608a 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -240,18 +240,21 @@ InitProcGlobal(void) /* PGPROC for normal backend, add to freeProcs list */ procs[i].links.next = (SHM_QUEUE *) ProcGlobal->freeProcs; ProcGlobal->freeProcs = &procs[i]; + procs[i].procgloballist = &ProcGlobal->freeProcs; } else if (i < MaxConnections + autovacuum_max_workers + 1) { /* PGPROC for AV launcher/worker, add to autovacFreeProcs list */ procs[i].links.next = (SHM_QUEUE *) ProcGlobal->autovacFreeProcs; ProcGlobal->autovacFreeProcs = &procs[i]; + procs[i].procgloballist = &ProcGlobal->autovacFreeProcs; } else if (i < MaxBackends) { /* PGPROC for bgworker, add to bgworkerFreeProcs list */ procs[i].links.next = (SHM_QUEUE *) ProcGlobal->bgworkerFreeProcs; ProcGlobal->bgworkerFreeProcs = &procs[i]; + procs[i].procgloballist = &ProcGlobal->bgworkerFreeProcs; } /* Initialize myProcLocks[] shared memory queues. */ @@ -279,6 +282,7 @@ InitProcess(void) { /* use volatile pointer to prevent code rearrangement */ volatile PROC_HDR *procglobal = ProcGlobal; + PGPROC * volatile * procgloballist; /* * ProcGlobal should be set up already (if we are a backend, we inherit @@ -297,9 +301,17 @@ InitProcess(void) */ InitializeLatchSupport(); + /* Decide which list should supply our PGPROC. */ + if (IsAnyAutoVacuumProcess()) + procgloballist = &procglobal->autovacFreeProcs; + else if (IsBackgroundWorker) + procgloballist = &procglobal->bgworkerFreeProcs; + else + procgloballist = &procglobal->freeProcs; + /* - * Try to get a proc struct from the free list. If this fails, we must be - * out of PGPROC structures (not to mention semaphores). + * Try to get a proc struct from the appropriate free list. If this + * fails, we must be out of PGPROC structures (not to mention semaphores). * * While we are holding the ProcStructLock, also copy the current shared * estimate of spins_per_delay to local storage. @@ -308,21 +320,11 @@ InitProcess(void) set_spins_per_delay(procglobal->spins_per_delay); - if (IsAnyAutoVacuumProcess()) - MyProc = procglobal->autovacFreeProcs; - else if (IsBackgroundWorker) - MyProc = procglobal->bgworkerFreeProcs; - else - MyProc = procglobal->freeProcs; + MyProc = *procgloballist; if (MyProc != NULL) { - if (IsAnyAutoVacuumProcess()) - procglobal->autovacFreeProcs = (PGPROC *) MyProc->links.next; - else if (IsBackgroundWorker) - procglobal->bgworkerFreeProcs = (PGPROC *) MyProc->links.next; - else - procglobal->freeProcs = (PGPROC *) MyProc->links.next; + *procgloballist = (PGPROC *) MyProc->links.next; SpinLockRelease(ProcStructLock); } else @@ -341,6 +343,12 @@ InitProcess(void) MyPgXact = &ProcGlobal->allPgXact[MyProc->pgprocno]; /* + * Cross-check that the PGPROC is of the type we expect; if this were + * not the case, it would get returned to the wrong list. + */ + Assert(MyProc->procgloballist == procgloballist); + + /* * Now that we have a PGPROC, mark ourselves as an active postmaster * child; this is so that the postmaster can detect it if we exit without * cleaning up. (XXX autovac launcher currently doesn't participate in @@ -774,6 +782,7 @@ ProcKill(int code, Datum arg) /* use volatile pointer to prevent code rearrangement */ volatile PROC_HDR *procglobal = ProcGlobal; PGPROC *proc; + PGPROC * volatile * procgloballist; Assert(MyProc != NULL); @@ -810,24 +819,12 @@ ProcKill(int code, Datum arg) MyProc = NULL; DisownLatch(&proc->procLatch); + procgloballist = proc->procgloballist; SpinLockAcquire(ProcStructLock); /* Return PGPROC structure (and semaphore) to appropriate freelist */ - if (IsAnyAutoVacuumProcess()) - { - proc->links.next = (SHM_QUEUE *) procglobal->autovacFreeProcs; - procglobal->autovacFreeProcs = proc; - } - else if (IsBackgroundWorker) - { - proc->links.next = (SHM_QUEUE *) procglobal->bgworkerFreeProcs; - procglobal->bgworkerFreeProcs = proc; - } - else - { - proc->links.next = (SHM_QUEUE *) procglobal->freeProcs; - procglobal->freeProcs = proc; - } + proc->links.next = (SHM_QUEUE *) *procgloballist; + *procgloballist = proc; /* Update shared estimate of spins_per_delay */ procglobal->spins_per_delay = update_spins_per_delay(procglobal->spins_per_delay); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index c23f4da..f2fec32 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -77,6 +77,7 @@ struct PGPROC { /* proc->links MUST BE FIRST IN STRUCT (see ProcSleep,ProcWakeup,etc) */ SHM_QUEUE links; /* list link if process is in a list */ + PGPROC **procgloballist; /* procglobal list that owns this PGPROC */ PGSemaphoreData sem; /* ONE semaphore to sleep on */ int waitStatus; /* STATUS_WAITING, STATUS_OK or STATUS_ERROR */ -- 1.7.9.6 (Apple Git-31.1)
diff --git a/contrib/test_group_locking/Makefile b/contrib/test_group_locking/Makefile new file mode 100644 index 0000000..2d09341 --- /dev/null +++ b/contrib/test_group_locking/Makefile @@ -0,0 +1,21 @@ +# contrib/test_group_locking/Makefile + +MODULE_big = test_group_locking +OBJS = test_group_locking.o $(WIN32RES) +PGFILEDESC = "test_group_locking - test harness for group locking" + +EXTENSION = test_group_locking +DATA = test_group_locking--1.0.sql + +REGRESS = test_group_locking + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/test_group_locking +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/test_group_locking/test_group_locking--1.0.sql b/contrib/test_group_locking/test_group_locking--1.0.sql new file mode 100644 index 0000000..adb2be5 --- /dev/null +++ b/contrib/test_group_locking/test_group_locking--1.0.sql @@ -0,0 +1,8 @@ +/* contrib/test_group_locking/test_group_locking--1.0.sql */ + +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION test_group_locking" to load this file. \quit + +CREATE FUNCTION test_group_locking(spec pg_catalog.text) + RETURNS pg_catalog.void STRICT + AS 'MODULE_PATHNAME' LANGUAGE C; diff --git a/contrib/test_group_locking/test_group_locking.c b/contrib/test_group_locking/test_group_locking.c new file mode 100644 index 0000000..36f1277 --- /dev/null +++ b/contrib/test_group_locking/test_group_locking.c @@ -0,0 +1,1066 @@ +/*-------------------------------------------------------------------------- + * + * test_group_locking.c + * Test harness code for group locking. + * + * Copyright (C) 2013, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/test_shm_mq/test.c + * + * ------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xact.h" +#include "catalog/namespace.h" +#include "commands/dbcommands.h" +#include "fmgr.h" +#include "lib/ilist.h" +#include "lib/stringinfo.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqmq.h" +#include "mb/pg_wchar.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "parser/scansup.h" +#include "storage/ipc.h" +#include "storage/lmgr.h" +#include "storage/procarray.h" +#include "storage/shm_mq.h" +#include "storage/shm_toc.h" +#include "utils/builtins.h" +#include "utils/hsearch.h" +#include "utils/memutils.h" +#include "utils/resowner.h" + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(test_group_locking); + +void test_group_locking_worker_main(Datum main_arg); + +/* Names of lock modes, for debug printouts */ +static const char *const lock_mode_names[] = +{ + "INVALID", + "AccessShareLock", + "RowShareLock", + "RowExclusiveLock", + "ShareUpdateExclusiveLock", + "ShareLock", + "ShareRowExclusiveLock", + "ExclusiveLock", + "AccessExclusiveLock" +}; + +typedef enum +{ + TGL_START, + TGL_STOP, + TGL_LOCK, + TGL_UNLOCK +} TestGroupLockOp; + +typedef struct +{ + TestGroupLockOp op; + LOCKMODE lockmode; + Oid relid; +} TestGroupLockCommand; + +typedef struct +{ + dlist_node node; + bool verified; + int group_id; + int task_id; + TestGroupLockCommand command; +} TestGroupLockStep; + +typedef struct +{ + int group_id; + int task_id; +} worker_key; + +typedef struct +{ + worker_key key; + dsm_segment *seg; + BackgroundWorkerHandle *handle; + shm_mq_handle *requesth; + shm_mq_handle *responseh; + bool awaiting_response; +} worker_info; + +typedef struct +{ + int group_id; + int leader_task_id; + bool has_followers; +} leader_info; + +/* Fixed-size data passed via our dynamic shared memory segment. */ +typedef struct worker_fixed_data +{ + Oid database_id; + Oid authenticated_user_id; + NameData database; + NameData authenticated_user; + bool use_group_locking; + pid_t leader_pid; +} worker_fixed_data; + +#define SHM_QUEUE_SIZE 32768 +#define TEST_GROUP_LOCKING_MAGIC 0x4c616e65 + +static void check_for_messages(HTAB *worker_hash); +static void determine_leader_info(dlist_head *plan, HTAB *leader_hash); +static void handle_sigterm(SIGNAL_ARGS); +static void process_message(HTAB *worker_hash, worker_info *info, + char *message, Size message_bytes); +static void rationalize_steps(dlist_head *plan); +static void rationalize_steps_for_task(dlist_head *plan, int group_id, + int task_id); +static void report_syntax_error(StringInfo buf); +static bool scan_character(StringInfo buf, char c); +static bool scan_eof(StringInfo buf); +static char *scan_identifier(StringInfo buf); +static bool scan_integer(StringInfo buf, int *result); +static LOCKMODE scan_lockmode(StringInfo buf); +static TestGroupLockOp scan_op(StringInfo buf); +static RangeVar *scan_qualified_identifier(StringInfo buf); +static char *scan_quoted_identifier(StringInfo buf); +static void send_command(HTAB *worker_hash, TestGroupLockStep *step); +static void start_worker(HTAB *worker_hash, int group_id, int task_id, + int leader_task_id); + +/*-------------------------------------------------------------------------- + * Main entrypoint. + * + * Start background workers and have them issue lock requests against + * specified relations. We use a little mini-language to control this: + * + * N[.M]:start + * N[.M]:stop + * N[.M]:lock:lockmode:relation + * N[.M]:unlock:lockmode:relation + * + * N and M should be integers. M can be omitted, in which case it defaults + * to 0. Each (N, M) pair identifies a separate worker; those with the + * same value of N are in the same lock group. All workers not started + * explicitly are started before any other actions are taken; and all + * workers not terminated explicitly are terminated after all other actions + * are taken. + *-------------------------------------------------------------------------- + */ +Datum +test_group_locking(PG_FUNCTION_ARGS) +{ + text *spec = PG_GETARG_TEXT_PP(0); + StringInfo buf = makeStringInfo(); + dlist_head plan; + dlist_iter iter; + HASHCTL hashctl; + HTAB *worker_hash; + HTAB *leader_hash; + + appendBinaryStringInfo(buf, VARDATA_ANY(spec), VARSIZE_ANY_EXHDR(spec)); + dlist_init(&plan); + + /* Parse the user-provided specification. */ + for (;;) + { + TestGroupLockStep *step; + + step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep)); + if (!scan_integer(buf, &step->group_id)) + report_syntax_error(buf); + if (scan_character(buf, '.') && !scan_integer(buf, &step->task_id)) + report_syntax_error(buf); + if (!scan_character(buf, ':')) + report_syntax_error(buf); + step->command.op = scan_op(buf); + + if (step->command.op == TGL_LOCK || step->command.op == TGL_UNLOCK) + { + RangeVar *rv; + if (!scan_character(buf, ':')) + report_syntax_error(buf); + step->command.lockmode = scan_lockmode(buf); + if (!scan_character(buf, ':')) + report_syntax_error(buf); + rv = scan_qualified_identifier(buf); + + /* + * Since we're trying to test locking here, don't take a lock + * when locking the relation. That's unsafe in the presence of + * concurrent DDL, but since this is just test code, we don't + * care. + */ + step->command.relid = RangeVarGetRelid(rv, NoLock, false); + } + + dlist_push_tail(&plan, &step->node); + + if (scan_eof(buf)) + break; + if (!scan_character(buf, ',')) + report_syntax_error(buf); + } + + /* Make sure the series of steps looks sensible. */ + rationalize_steps(&plan); + + /* Initialize worker hash table. */ + memset(&hashctl, 0, sizeof(HASHCTL)); + hashctl.keysize = sizeof(worker_key); + hashctl.entrysize = sizeof(worker_info); + hashctl.hcxt = CurrentMemoryContext; + hashctl.hash = tag_hash; + worker_hash = hash_create("test_group_locking workers", 16, &hashctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + /* Initialize leader hash table. */ + memset(&hashctl, 0, sizeof(HASHCTL)); + hashctl.keysize = sizeof(int); + hashctl.entrysize = sizeof(leader_info); + hashctl.hcxt = CurrentMemoryContext; + hashctl.hash = tag_hash; + leader_hash = hash_create("test_group_locking leaders", 16, &hashctl, + HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + /* Determine group leadership information. */ + determine_leader_info(&plan, leader_hash); + + /* Execute the plan. */ + dlist_foreach(iter, &plan) + { + TestGroupLockStep *step; + + step = dlist_container(TestGroupLockStep, node, iter.cur); + + if (step->command.op == TGL_START) + { + leader_info *li; + + li = hash_search(leader_hash, &step->group_id, HASH_FIND, NULL); + Assert(li != NULL); + start_worker(worker_hash, step->group_id, step->task_id, + li->has_followers ? li->leader_task_id : -1); + continue; + } + + send_command(worker_hash, step); + } + + PG_RETURN_VOID(); +} + +/* Check for messages from our workers. */ +static void +check_for_messages(HTAB *worker_hash) +{ + bool progress = true; + + while (progress) + { + HASH_SEQ_STATUS hash_seq; + worker_info *info; + + progress = false; + hash_seq_init(&hash_seq, worker_hash); + while ((info = hash_seq_search(&hash_seq)) != NULL) + { + shm_mq_result result; + Size nbytes; + void *data; + + result = shm_mq_receive(info->responseh, &nbytes, &data, true); + if (result == SHM_MQ_DETACHED) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection to background worker %d.%d lost", + info->key.group_id, info->key.task_id))); + if (result == SHM_MQ_SUCCESS) + { + progress = true; + process_message(worker_hash, info, data, nbytes); + } + } + } +} + +/* Determine leadership information for each group. */ +static void +determine_leader_info(dlist_head *plan, HTAB *leader_hash) +{ + dlist_iter iter; + + dlist_foreach(iter, plan) + { + TestGroupLockStep *step; + leader_info *li; + bool found; + + step = dlist_container(TestGroupLockStep, node, iter.cur); + li = hash_search(leader_hash, &step->group_id, HASH_ENTER, &found); + if (!found) + li->leader_task_id = step->task_id; + else if (step->task_id != li->leader_task_id) + li->has_followers = true; + } +} + +/* Error context callback. */ +static void +error_callback(void *arg) +{ + worker_info *info = arg; + + errcontext("background worker, group %d, task %d", info->key.group_id, + info->key.task_id); +} + +/* Handle SIGTERM. */ +static void +handle_sigterm(SIGNAL_ARGS) +{ + int save_errno = errno; + + if (MyProc) + SetLatch(&MyProc->procLatch); + + if (!proc_exit_inprogress) + { + InterruptPending = true; + ProcDiePending = true; + } + + errno = save_errno; +} + +/* + * Make sure we have a rational series of steps, and add missing start and + * stop steps as needed. + */ +static void +rationalize_steps(dlist_head *plan) +{ + bool progress = true; + + while (progress) + { + dlist_iter iter; + progress = false; + + dlist_foreach(iter, plan) + { + TestGroupLockStep *step; + + step = dlist_container(TestGroupLockStep, node, iter.cur); + if (!step->verified) + { + rationalize_steps_for_task(plan, step->group_id, + step->task_id); + progress = true; + break; + } + } + } +} + +/* + * Linear search through the provided list of steps. Figure out whether any + * start action is unique and precedes all other actions for this task, and + * whether any stop action is unique and follow all other such actions. If + * the steps are out of order, error; if they are missing, add them at the + * beginning and end as appropriate. + */ +static void +rationalize_steps_for_task(dlist_head *plan, int group_id, int task_id) +{ + dlist_iter iter; + bool saw_start = false; + bool saw_stop = false; + bool saw_other = false; + + dlist_foreach(iter, plan) + { + TestGroupLockStep *step; + + step = dlist_container(TestGroupLockStep, node, iter.cur); + if (step->group_id != group_id || step->task_id != task_id) + continue; + step->verified = true; + + switch (step->command.op) + { + case TGL_START: + if (saw_start) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("can't start same worker more than once"))); + if (saw_stop || saw_other) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("can't start worker after stopping it"))); + if (saw_other) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("worker can't perform actions before being started"))); + saw_start = true; + break; + case TGL_STOP: + if (saw_stop) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("can't stop same worker more than once"))); + saw_stop = true; + break; + default: + if (saw_stop) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("worker can't perform actions after being stopped"))); + saw_other = true; + break; + } + } + + if (!saw_start) + { + TestGroupLockStep *step; + + step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep)); + step->group_id = group_id; + step->task_id = task_id; + step->command.op = TGL_START; + step->verified = true; + dlist_push_head(plan, &step->node); + } + + if (!saw_stop) + { + TestGroupLockStep *step; + + step = (TestGroupLockStep *) palloc0(sizeof(TestGroupLockStep)); + step->group_id = group_id; + step->task_id = task_id; + step->command.op = TGL_STOP; + step->verified = true; + dlist_push_tail(plan, &step->node); + } +} + +/* Report a syntax error. */ +static void +report_syntax_error(StringInfo buf) +{ + char badchar[MAX_MULTIBYTE_CHAR_LEN + 1]; + int badpos = pg_mbstrlen_with_len(buf->data, buf->cursor) + 1; + int badcharlen; + + if (buf->cursor >= buf->len) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unexpected end of string at position %d", badpos))); + + badcharlen = pg_mblen(&buf->data[buf->cursor]); + memcpy(badchar, &buf->data[buf->cursor], badcharlen); + badchar[badcharlen] = '\0'; + + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("unexpected character \"%s\" at position %d", + badchar, badpos))); +} + +/* Scan a given character. */ +static bool +scan_character(StringInfo buf, char c) +{ + if (buf->cursor < buf->len && buf->data[buf->cursor] == c) + { + ++buf->cursor; + return true; + } + return false; +} + +/* Scan end-of-buffer. */ +static bool +scan_eof(StringInfo buf) +{ + return buf->cursor >= buf->len; +} + +/* Scan and return a single-part PostgreSQL identifier. */ +static char * +scan_identifier(StringInfo buf) +{ + int start = buf->cursor; + + if (buf->data[start] == '"') + return scan_quoted_identifier(buf); + + while (buf->cursor < buf->len) + { + int len = pg_mblen(&buf->data[buf->cursor]); + char c; + + /* Multibyte characters are allowed. */ + if (len != 1) + { + Assert(len > 0); + buf->cursor += len; + continue; + } + + /* Alphabetic characters, and underscore, are allowed. */ + c = buf->data[buf->cursor]; + if ((c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z') || c == '_') + { + buf->cursor++; + continue; + } + + /* Numeric digits, and $, are allowed but not at character 1. */ + if (start != buf->cursor && ((c >= '0' && c <= '9') || c == '$')) + { + buf->cursor++; + continue; + } + + /* Anything else is not allowed. */ + break; + } + + /* Return NULL if we didn't find an identifier. */ + if (buf->cursor == start) + return NULL; + + /* Return a copy of the identifier with appropriate case-folding. */ + return downcase_truncate_identifier(&buf->data[start], buf->cursor - start, + false); +} + +/* Scan an integer. */ +static bool +scan_integer(StringInfo buf, int *result) +{ + int start = buf->cursor; + int val = 0; + + while (buf->cursor < buf->len) + { + char c = buf->data[buf->cursor]; + + if (c < '0' || c > '9') + break; + val = val * 10 + (c - '0'); + ++buf->cursor; + } + + if (buf->cursor == start) + return false; + *result = val; + return true; +} + +/* Scan and return a lock mode. */ +static LOCKMODE +scan_lockmode(StringInfo buf) +{ + char *mode = scan_identifier(buf); + + if (mode == NULL) + report_syntax_error(buf); + + if (pg_strcasecmp(mode, "AccessShareLock") == 0) + return AccessShareLock; + else if (pg_strcasecmp(mode, "RowShareLock") == 0) + return RowShareLock; + else if (pg_strcasecmp(mode, "RowExclusiveLock") == 0) + return RowExclusiveLock; + else if (pg_strcasecmp(mode, "ShareUpdateExclusiveLock") == 0) + return ShareUpdateExclusiveLock; + else if (pg_strcasecmp(mode, "ShareLock") == 0) + return ShareLock; + else if (pg_strcasecmp(mode, "ShareRowExclusiveLock") == 0) + return ShareRowExclusiveLock; + else if (pg_strcasecmp(mode, "ExclusiveLock") == 0) + return ExclusiveLock; + else if (pg_strcasecmp(mode, "AccessExclusiveLock") == 0) + return AccessExclusiveLock; + + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid lock mode: \"%s\"", mode))); +} + +/* Scan and return an operation name. */ +static TestGroupLockOp +scan_op(StringInfo buf) +{ + char *opname = scan_identifier(buf); + + if (opname == NULL) + report_syntax_error(buf); + + if (pg_strcasecmp(opname, "start") == 0) + return TGL_START; + else if (pg_strcasecmp(opname, "stop") == 0) + return TGL_STOP; + else if (pg_strcasecmp(opname, "lock") == 0) + return TGL_LOCK; + else if (pg_strcasecmp(opname, "unlock") == 0) + return TGL_UNLOCK; + + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid operation name: \"%s\"", opname))); +} + +/* Scan and return a possibly schema-qualified identifier. */ +static RangeVar * +scan_qualified_identifier(StringInfo buf) +{ + char *name1; + char *name2; + + name1 = scan_identifier(buf); + if (name1 == NULL) + report_syntax_error(buf); + if (buf->data[buf->cursor] != '.') + return makeRangeVar(NULL, name1, -1); + buf->cursor++; + name2 = scan_identifier(buf); + if (name2 == NULL) + report_syntax_error(buf); + return makeRangeVar(name1, name2, -1); +} + +/* Scan and return a quoted single-part identifier. */ +static char * +scan_quoted_identifier(StringInfo buf) +{ + StringInfoData result; + + initStringInfo(&result); + + if (buf->data[buf->cursor] != '"') + return NULL; + + while (++buf->cursor < buf->len) + { + char *s; + + /* If we see a byte that is not a quote, append to result. */ + s = &buf->data[buf->cursor]; + if (s[0] != '"') + { + appendStringInfoChar(&result, s[0]); + continue; + } + + /* If we see a byte that is a quote, check for a following quote. */ + if (++buf->cursor < buf->len && s[1] == '"') + { + appendStringInfoChar(&result, s[0]); + continue; + } + + /* We've found the terminating quote, so stop here. */ + return result.data; + } + + /* We ran off the end of the buffer with no close-quote. Oops. */ + return NULL; +} + +/* Process a message from a background worker. */ +static void +process_message(HTAB *worker_hash, worker_info *info, char *message, + Size message_bytes) +{ + StringInfoData msg; + char msgtype; + const char *tag; + + initStringInfo(&msg); + enlargeStringInfo(&msg, message_bytes); + appendBinaryStringInfo(&msg, message, message_bytes); + msgtype = pq_getmsgbyte(&msg); + + if (msgtype == 'E' || msgtype == 'N') + { + ErrorData edata; + ErrorContextCallback context; + + pq_parse_errornotice(&msg, &edata); + edata.elevel = Min(edata.elevel, ERROR); + context.callback = error_callback; + context.arg = info; + context.previous = error_context_stack; + error_context_stack = &context; + ThrowErrorData(&edata); + error_context_stack = context.previous; + return; + } + + /* Not error or notice, so must be command complete. */ + if (msgtype != 'C') + elog(ERROR, "unknown message type: %c (%zu bytes)", + msg.data[0], message_bytes); + tag = pq_getmsgstring(&msg); + + /* Hopefully we were waiting for a response... */ + if (!info->awaiting_response) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unexpected acknowledgement from worker %d.%d: \"%s\"", + info->key.group_id, info->key.task_id, tag))); + info->awaiting_response = false; + + /* If the client indicates that it will stop, detach from it. */ + if (strcmp(tag, "STOP") == 0) + { + dsm_detach(info->seg); + hash_search(worker_hash, &info->key, HASH_REMOVE, NULL); + } +} + +/* Send a command to a background worker. */ +static void +send_command(HTAB *worker_hash, TestGroupLockStep *step) +{ + worker_key key; + bool found; + worker_info *info; + shm_mq_result result; + + key.group_id = step->group_id; + key.task_id = step->task_id; + info = hash_search(worker_hash, &key, HASH_FIND, &found); + Assert(found); + + /* Display progress report. */ + switch (step->command.op) + { + case TGL_STOP: + ereport(NOTICE, + (errmsg("stopping worker %d.%d", + step->group_id, step->task_id))); + break; + + case TGL_LOCK: + ereport(NOTICE, + (errmsg("instructing worker %d.%d to acquire %s on relation with OID %u", + step->group_id, step->task_id, + lock_mode_names[step->command.lockmode], + step->command.relid))); + break; + + case TGL_UNLOCK: + ereport(NOTICE, + (errmsg("instructing worker %d.%d to release %s on relation with OID %u", + step->group_id, step->task_id, + lock_mode_names[step->command.lockmode], + step->command.relid))); + break; + default: + elog(ERROR, "bad operation code: %d", (int) step->command.op); + } + + /* Transmit command to worker. */ + result = shm_mq_send(info->requesth, sizeof(TestGroupLockCommand), + &step->command, false); + if (result != SHM_MQ_SUCCESS) + ereport(ERROR, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection to background worker lost"))); + info->awaiting_response = true; + for (;;) + { + check_for_messages(worker_hash); + if (!info->awaiting_response) + break; + WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(&MyProc->procLatch); + } + +} + +/* Start a background worker. */ +static void +start_worker(HTAB *worker_hash, int group_id, int task_id, int leader_task_id) +{ + worker_key key; + worker_info *info; + bool found; + shm_toc_estimator e; + Size segsize; + shm_toc *toc; + worker_fixed_data *fdata; + shm_mq *requestq; + shm_mq *responseq; + BackgroundWorker worker; + + /* Set up entry in hash table. */ + key.group_id = group_id; + key.task_id = task_id; + info = hash_search(worker_hash, &key, HASH_ENTER, &found); + Assert(!found); + + /* Log a message explaining what we're going to do. */ + if (leader_task_id < 0) + ereport(NOTICE, + (errmsg("starting worker %d.%d", group_id, task_id))); + else if (task_id == leader_task_id) + ereport(NOTICE, + (errmsg("starting worker %d.%d as group leader", + group_id, task_id))); + else + ereport(NOTICE, + (errmsg("starting worker %d.%d with group leader %d.%d", + group_id, task_id, group_id, leader_task_id))); + + /* Create dynamic shared memory segment and table of contents. */ + shm_toc_initialize_estimator(&e); + shm_toc_estimate_chunk(&e, sizeof(worker_fixed_data)); + shm_toc_estimate_chunk(&e, SHM_QUEUE_SIZE); + shm_toc_estimate_chunk(&e, SHM_QUEUE_SIZE); + shm_toc_estimate_keys(&e, 3); + segsize = shm_toc_estimate(&e); + info->seg = dsm_create(segsize); + toc = shm_toc_create(TEST_GROUP_LOCKING_MAGIC, + dsm_segment_address(info->seg), segsize); + + /* Store fixed-size data in dynamic shared memory. */ + fdata = shm_toc_allocate(toc, sizeof(worker_fixed_data)); + fdata->database_id = MyDatabaseId; + fdata->authenticated_user_id = GetAuthenticatedUserId(); + namestrcpy(&fdata->database, get_database_name(MyDatabaseId)); + namestrcpy(&fdata->authenticated_user, + GetUserNameFromId(fdata->authenticated_user_id)); + shm_toc_insert(toc, 0, fdata); + if (leader_task_id >= 0) + { + fdata->use_group_locking = true; + if (task_id == leader_task_id) + fdata->leader_pid = 0; + else + { + worker_key lkey; + worker_info *leader_info; + + lkey.group_id = group_id; + lkey.task_id = leader_task_id; + leader_info = hash_search(worker_hash, &lkey, HASH_ENTER, &found); + Assert(found); + if (GetBackgroundWorkerPid(leader_info->handle, &fdata->leader_pid) + != BGWH_STARTED) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not determine PID of leader %d.%d", + group_id, leader_task_id))); + } + } + + /* Establish message queues in dynamic shared memory. */ + requestq = shm_mq_create(shm_toc_allocate(toc, SHM_QUEUE_SIZE), + SHM_QUEUE_SIZE); + shm_toc_insert(toc, 1, requestq); + shm_mq_set_sender(requestq, MyProc); + info->requesth = shm_mq_attach(requestq, info->seg, NULL); + responseq = shm_mq_create(shm_toc_allocate(toc, SHM_QUEUE_SIZE), + SHM_QUEUE_SIZE); + shm_toc_insert(toc, 2, responseq); + shm_mq_set_receiver(responseq, MyProc); + info->responseh = shm_mq_attach(responseq, info->seg, NULL); + + /* Configure a worker. */ + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + worker.bgw_restart_time = BGW_NEVER_RESTART; + worker.bgw_main = NULL; /* new worker might not have library loaded */ + sprintf(worker.bgw_library_name, "test_group_locking"); + sprintf(worker.bgw_function_name, "test_group_locking_worker_main"); + snprintf(worker.bgw_name, BGW_MAXLEN, + "test_group_locking %d/%d by PID %d", group_id, task_id, MyProcPid); + worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(info->seg)); + /* set bgw_notify_pid, so we can detect if the worker stops */ + worker.bgw_notify_pid = MyProcPid; + + /* Register the worker. */ + if (!RegisterDynamicBackgroundWorker(&worker, &info->handle)) + ereport(ERROR, + (errcode(ERRCODE_INSUFFICIENT_RESOURCES), + errmsg("could not register background process"), + errhint("You may need to increase max_worker_processes."))); + shm_mq_set_handle(info->requesth, info->handle); + shm_mq_set_handle(info->responseh, info->handle); + + /* Wait for the worker to come online. */ + info->awaiting_response = true; + for (;;) + { + check_for_messages(worker_hash); + if (!info->awaiting_response) + break; + WaitLatch(&MyProc->procLatch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0); + CHECK_FOR_INTERRUPTS(); + ResetLatch(&MyProc->procLatch); + } +} + +/* Background worker entrypoint. */ +void +test_group_locking_worker_main(Datum main_arg) +{ + dsm_segment *seg; + shm_toc *toc; + shm_mq *requestq; + shm_mq *responseq; + shm_mq_handle *requesth; + shm_mq_handle *responseh; + worker_fixed_data *fdata; + + /* Establish signal handlers. */ + pqsignal(SIGTERM, handle_sigterm); + BackgroundWorkerUnblockSignals(); + + /* Set up a memory context and resource owner. */ + Assert(CurrentResourceOwner == NULL); + CurrentResourceOwner = ResourceOwnerCreate(NULL, "test_group_locking"); + CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, + "test_group_locking", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + /* Connect to the dynamic shared memory segment. */ + seg = dsm_attach(DatumGetInt32(main_arg)); + if (seg == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("unable to map dynamic shared memory segment"))); + toc = shm_toc_attach(TEST_GROUP_LOCKING_MAGIC, dsm_segment_address(seg)); + if (toc == NULL) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("bad magic number in dynamic shared memory segment"))); + + /* Find shared memory queues and attach to them. */ + requestq = shm_toc_lookup(toc, 1); + shm_mq_set_receiver(requestq, MyProc); + requesth = shm_mq_attach(requestq, seg, NULL); + responseq = shm_toc_lookup(toc, 2); + shm_mq_set_sender(responseq, MyProc); + responseh = shm_mq_attach(responseq, seg, NULL); + pq_redirect_to_shm_mq(responseq, responseh); + + /* Connect to database. */ + fdata = shm_toc_lookup(toc, 0); + BackgroundWorkerInitializeConnection(NameStr(fdata->database), + NameStr(fdata->authenticated_user)); + if (fdata->database_id != MyDatabaseId || + fdata->authenticated_user_id != GetAuthenticatedUserId()) + ereport(ERROR, + (errmsg("user or database renamed during worker startup"))); + + /* Activate group locking, if appropriate. */ + if (fdata->use_group_locking) + { + if (fdata->leader_pid == 0) + BecomeLockGroupLeader(); + else + { + PGPROC *proc; + + /* + * This is a cheesy hack that I'm going with for the sake of + * getting this test code running. Don't really do it this way! + * + * In a real parallel computation, all of the workers in a lock + * group would be started by the same process, which should pass + * its own value of MyProc and its pid to those followers. That + * way, if the leader exits before the children are up and running, + * they'll fail to join the lock group unless (a) the same PID + * is again running and (b) it is a PostgreSQL process and (c) it + * it using the same PGPROC as before and (d) it is again a lock + * group leader. Looking up the proc using the PID, as we're doing + * here, loses the third of those guarantees - which is not a + * catastrophe, but best avoided. + */ + + proc = BackendPidGetProc(fdata->leader_pid); + if (proc == NULL + || !BecomeLockGroupFollower(proc, fdata->leader_pid)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not join lock group for leader PID %d", + fdata->leader_pid))); + } + } + + /* Inform the worker who started us that we're up and running. */ + pq_putmessage('C', "START", 6); + + /* Begin a transaction. */ + StartTransactionCommand(); + + /* Main loop: read and process messages. */ + for (;;) + { + Size nbytes; + void *data; + shm_mq_result result; + TestGroupLockCommand *command; + + result = shm_mq_receive(requesth, &nbytes, &data, false); + if (result != SHM_MQ_SUCCESS) + ereport(FATAL, + (errcode(ERRCODE_CONNECTION_FAILURE), + errmsg("connection to user backend lost"))); + if (nbytes != sizeof(TestGroupLockCommand)) + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid command message from user backend"))); + command = data; + + switch (command->op) + { + case TGL_STOP: + CommitTransactionCommand(); + pq_putmessage('C', "STOP", 5); + exit(0); + break; + + case TGL_LOCK: + if (!ConditionalLockRelationOid(command->relid, + command->lockmode)) + ereport(ERROR, + (errcode(ERRCODE_LOCK_NOT_AVAILABLE), + errmsg("could not obtain %s on relation with OID %u", + lock_mode_names[command->lockmode], + command->relid))); + pq_putmessage('C', "LOCK", 5); + break; + + case TGL_UNLOCK: + UnlockRelationOid(command->relid, command->lockmode); + pq_putmessage('C', "UNLOCK", 7); + break; + + default: + elog(ERROR, "unknown operation: %d", (int) command->op); + } + } +} diff --git a/contrib/test_group_locking/test_group_locking.control b/contrib/test_group_locking/test_group_locking.control new file mode 100644 index 0000000..3b69359 --- /dev/null +++ b/contrib/test_group_locking/test_group_locking.control @@ -0,0 +1,4 @@ +comment = 'Test code for group locking' +default_version = '1.0' +module_pathname = '$libdir/test_group_locking' +relocatable = true
-- Sent via pgsql-hackers mailing list (pgsql-hackers@postgresql.org) To make changes to your subscription: http://www.postgresql.org/mailpref/pgsql-hackers