http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2c148caa/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedLockServiceDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedLockServiceDUnitTest.java
 
b/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedLockServiceDUnitTest.java
new file mode 100755
index 0000000..42c3f01
--- /dev/null
+++ 
b/geode-core/src/test/java/com/gemstone/gemfire/distributed/DistributedLockServiceDUnitTest.java
@@ -0,0 +1,3246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.distributed;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.Assert;
+import junit.framework.AssertionFailedError;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.distributed.internal.DistributionManager;
+import com.gemstone.gemfire.distributed.internal.DistributionMessage;
+import com.gemstone.gemfire.distributed.internal.DistributionMessageObserver;
+import com.gemstone.gemfire.distributed.internal.locks.DLockGrantor;
+import com.gemstone.gemfire.distributed.internal.locks.DLockRemoteToken;
+import com.gemstone.gemfire.distributed.internal.locks.DLockRequestProcessor;
+import 
com.gemstone.gemfire.distributed.internal.locks.DLockRequestProcessor.DLockRequestMessage;
+import 
com.gemstone.gemfire.distributed.internal.locks.DLockRequestProcessor.DLockResponseMessage;
+import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+import com.gemstone.gemfire.distributed.internal.locks.DLockToken;
+import com.gemstone.gemfire.distributed.internal.locks.RemoteThread;
+import 
com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.util.StopWatch;
+import com.gemstone.gemfire.test.dunit.AsyncInvocation;
+import com.gemstone.gemfire.test.dunit.DistributedTestCase;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.Invoke;
+import com.gemstone.gemfire.test.dunit.LogWriterUtils;
+import com.gemstone.gemfire.test.dunit.RMIException;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.ThreadUtils;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.Wait;
+import com.gemstone.gemfire.test.dunit.WaitCriterion;
+
+/**
+ * This class tests distributed ownership via the DistributedLockService api.
+ */
+public class DistributedLockServiceDUnitTest extends DistributedTestCase {
+  
+       protected static DistributedSystem dlstSystem;
+  private static DistributedLockBlackboard blackboard;
+  protected static Object monitor = new Object();
+
+  private int hits = 0;
+  private int completes = 0;
+  private boolean done;
+  private boolean got;
+
+
+  public DistributedLockServiceDUnitTest(String name) {
+    super(name);
+    if (blackboard == null) {
+      try {
+        blackboard = DistributedLockBlackboardImpl.getInstance();
+      } catch (Exception e) {
+        throw new RuntimeException("initialization error", e);
+      }
+    }
+  }
+  
+  /////////// Test lifecycle //////////
+ 
+  public static void caseSetUp() throws Exception {
+    disconnectAllFromDS();
+  }
+  
+  public static void caseTearDown() throws Exception {
+    disconnectAllFromDS();
+  }
+  
+  /**
+   * Returns a previously created (or new, if this is the first
+   * time this method is called in this VM) distributed system
+   * which is somewhat configurable via hydra test parameters.
+   */
+  @Override
+  public final void postSetUp() throws Exception {
+    // Create a DistributedSystem in every VM
+    connectDistributedSystem();
+
+    for (int h = 0; h < Host.getHostCount(); h++) {
+      Host host = Host.getHost(h);
+
+      for (int v = 0; v < host.getVMCount(); v++) {
+        host.getVM(v).invoke(
+          DistributedLockServiceDUnitTest.class, "connectDistributedSystem", 
null);
+      }
+    }
+  }
+
+  @Override
+  public final void preTearDown() throws Exception {
+    Invoke.invokeInEveryVM(DistributedLockServiceDUnitTest.class,
+                    "destroyAllDLockServices"); 
+//    invokeInEveryVM(DistributedLockServiceDUnitTest.class,
+//                    "remoteDumpAllDLockServices"); 
+                    
+    //InternalDistributedLockService.destroyAll();
+    
+//     // Disconnects the DistributedSystem in every VM - since
+//     // each test randomly chooses whether shared memory is used
+//     disconnectAllFromDS();
+    
+    this.lockGrantor = null;
+  }
+  
+  public static void destroyAllDLockServices() {
+    DLockService.destroyAll();
+    dlstSystem = null;
+  }
+
+  public static void remoteDumpAllDLockServices() {
+    DLockService.dumpAllServices();
+  }
+
+  ///////// Remote setup/teardown support
+
+  /**
+   * Connects a DistributedSystem, saves it in static variable "system"
+   */
+  protected static void connectDistributedSystem() {
+    dlstSystem = (new DistributedLockServiceDUnitTest("dummy")).getSystem();
+  }
+
+  /////////  Public test methods
+  
+  public void testBasic() {
+    String serviceName = getUniqueName();
+    String objectName = "object";
+    
+    // Create service
+    DistributedLockService service = 
DistributedLockService.create(serviceName, dlstSystem);
+    
+    // Not locked initially
+    assertFalse(service.isHeldByCurrentThread(objectName));
+
+    // Get lock
+    assertTrue(service.lock(objectName, 3000, -1));
+    assertTrue(service.isHeldByCurrentThread(objectName));
+    assertTrue(service.lock(objectName, 3000, -1));
+    assertTrue(service.isHeldByCurrentThread(objectName));
+    
+    // Release lock
+    service.unlock(objectName);
+    assertTrue(service.isHeldByCurrentThread(objectName));
+    service.unlock(objectName);
+    assertFalse(service.isHeldByCurrentThread(objectName));
+    
+    // Destroy service
+    DistributedLockService.destroy(serviceName);
+  }
+  
+  public void testCreateDestroy() throws Exception {
+    final String serviceName = getUniqueName();
+    final String abc = "abc";
+    
+    // create and destroy dls
+    assertNull(DistributedLockService.getServiceNamed(serviceName));
+    DistributedLockService service = 
DistributedLockService.create(serviceName, getSystem());
+    assertSame(service, DistributedLockService.getServiceNamed(serviceName));
+    DistributedLockService.destroy(serviceName);
+    
+    // assert attempt to use dls throws LockServiceDestroyedException
+    try {
+      service.lock(abc, -1, -1);
+      fail("didn't get LockServiceDestroyedException");
+    } catch (LockServiceDestroyedException ex) {
+    }
+    
+    // assert that destroyed dls is no longer available
+    service = DistributedLockService.getServiceNamed(serviceName);
+    assertNull("" + service, service);
+    
+    // recreate the dls
+    service = DistributedLockService.create(serviceName, getSystem());
+    assertTrue(!((DLockService)service).isDestroyed());
+    ((DLockService)service).checkDestroyed();
+    
+    // get the same dls from another thread and hold a lock
+    Thread thread = new Thread(new Runnable() {
+      public void run() {
+        DistributedLockService dls = 
+            DistributedLockService.getServiceNamed(serviceName);
+        assertTrue(!((DLockService)dls).isDestroyed());
+        ((DLockService)dls).checkDestroyed();
+        dls.lock(abc, -1, -1); // get lock on abc and hold it
+      }
+    });
+    thread.start();
+    ThreadUtils.join(thread, 30 * 1000);
+    
+    // start a new thread to wait for lock on abc
+    AsyncInvocation remoteWaitingThread = 
+      Host.getHost(0).getVM(0).invokeAsync(new SerializableRunnable() {
+      public void run() {
+        DistributedLockService dls = 
+            DistributedLockService.create(serviceName, getSystem());
+        try {
+          dls.lock(abc, -1, -1); // waiting to get lock abc
+          fail("remoteWaitingThread got lock after dls destroyed");
+        }
+        catch (LockServiceDestroyedException expected) {
+          return;
+        }
+        fail("remoteWaitingThread lock failed to throw 
LockServiceDestroyedException");
+      }
+    });
+    
+    // loop will handle race condition with 1 sec sleep and retry
+    int retry = 10;
+    for (int i = 0; i < retry; i++) {
+      try {
+        // destroy DLS and free up remoteWaitingThread
+        Host.getHost(0).getVM(0).invoke(new SerializableRunnable() {
+          public void run() {
+            DistributedLockService.destroy(serviceName);
+          }
+        });
+      }
+      catch (RMIException e) {
+        // race condition: remoteWaitingThread probably hasn't created DLS yet
+        if (i < retry && e.getCause() instanceof IllegalArgumentException) {
+          sleep(1000);
+          continue;
+        }
+        else {
+          throw e;
+        }
+      }
+      break; // completed so break out of loop
+    }
+    
+    DistributedLockService.destroy(serviceName);
+    
+    // make sure remoteWaitingThread stopped waiting and threw 
LockServiceDestroyedException
+    ThreadUtils.join(remoteWaitingThread, 10 * 1000);
+    if (remoteWaitingThread.exceptionOccurred()) {
+      Throwable e = remoteWaitingThread.getException();
+      com.gemstone.gemfire.test.dunit.Assert.fail(e.getMessage(), e);
+    }
+    
+    // make sure LockServiceDestroyedException is thrown
+    try {
+      service.lock(abc, -1, -1);
+      fail("didn't get LockServiceDestroyedException");
+    } catch (LockServiceDestroyedException ex) {
+    }
+    
+    // make sure getServiceNamed returns null
+    service = DistributedLockService.getServiceNamed(serviceName);
+    assertNull("" + service, service);
+  }
+
+  protected static DistributedLockService dls_testFairness;
+  protected static int count_testFairness[] = new int[16];
+  protected static volatile boolean stop_testFairness;
+  protected static volatile boolean[] done_testFairness = new boolean[16];
+  static { Arrays.fill(done_testFairness, true); }
+  public void testFairness() throws Exception {
+    final String serviceName = "testFairness_" + getUniqueName();
+    final Object lock = "lock";
+    
+    // get the lock and hold it until all threads are ready to go
+    DistributedLockService service = 
+        DistributedLockService.create(serviceName, dlstSystem);
+    assertTrue(service.lock(lock, -1, -1));
+    
+    final int[] vmThreads = new int[] { 1, 4, 8, 16 };
+    forNumVMsInvoke(vmThreads.length,
+                    "remoteCreateService", 
+                    new Object[] { serviceName });
+    sleep(100);
+
+    // line up threads for the fairness race...
+    for (int i = 0; i < vmThreads.length; i++) {
+      final int vm = i;
+      LogWriterUtils.getLogWriter().info("[testFairness] lining up " + 
vmThreads[vm] + 
+                          " threads in vm " + vm); 
+                          
+      for (int j = 0; j < vmThreads[vm]; j++) {
+        final int thread = j;
+        /*getLogWriter().info("[testFairness] setting up thread " + thread +
+                            " in vm " + vm);*/
+        
+        Host.getHost(0).getVM(vm).invokeAsync(new SerializableRunnable() {
+          public void run() {
+            // lock, inc count, and unlock until stop_testFairness is set true
+            try {
+              done_testFairness[thread] = false;
+              dls_testFairness = 
DistributedLockService.getServiceNamed(serviceName);
+              while (!stop_testFairness) {
+                assertTrue(dls_testFairness.lock(lock, -1, -1));
+                count_testFairness[thread]++;
+                dls_testFairness.unlock(lock);
+              }
+              done_testFairness[thread] = true;
+            }
+            catch (VirtualMachineError e) {
+              SystemFailure.initiateFailure(e);
+              throw e;
+            }
+            catch (Throwable t) {
+              LogWriterUtils.getLogWriter().warning(t);
+              fail(t.getMessage());
+            }
+          }
+        });
+      }
+    }
+    sleep(500); // 500 ms
+    
+    // start the race!
+    service.unlock(lock);
+    sleep(1000 * 5); // 5 seconds
+    assertTrue(service.lock(lock, -1, -1));
+    
+    // stop the race...
+    for (int i = 0; i < vmThreads.length; i++) {
+      final int vm = i;
+      Host.getHost(0).getVM(vm).invoke(new SerializableRunnable() {
+        public void run() {
+          stop_testFairness = true;
+        }
+      });
+    }
+    service.unlock(lock);
+    for (int i = 0; i < vmThreads.length; i++) {
+      final int vm = i;
+      Host.getHost(0).getVM(vm).invoke(new SerializableRunnable() {
+        public void run() {
+          try {
+            boolean testIsDone = false;
+            while (!stop_testFairness || !testIsDone) {
+              testIsDone = true;
+              for (int i2 = 0; i2 < done_testFairness.length; i2++) {
+                if (!done_testFairness[i2]) testIsDone = false;
+              }
+            }
+            DistributedLockService.destroy(serviceName);
+          }
+          catch (VirtualMachineError e) {
+            SystemFailure.initiateFailure(e);
+            throw e;
+          }
+          catch (Throwable t) {
+            fail(t.getMessage());
+          }
+        }
+      });
+    }
+    
+    // calc total locks granted...
+    int totalLocks = 0;
+    int minLocks = Integer.MAX_VALUE;
+    int maxLocks = 0;
+
+    // add up total locks across all vms and threads...
+    int numThreads = 0;
+    for (int i = 0; i < vmThreads.length; i++) {
+      final int vm = i;
+      for (int j = 0; j < vmThreads[vm]; j++) {
+        final int thread = j;
+        Integer count = (Integer)Host.getHost(0).getVM(vm).invoke(() -> 
DistributedLockServiceDUnitTest.get_count_testFairness( new Integer(thread) ));
+        int numLocks = count.intValue();
+        if (numLocks < minLocks) minLocks = numLocks;
+        if (numLocks > maxLocks) maxLocks = numLocks;
+        totalLocks = totalLocks + numLocks;
+        numThreads++;
+      }
+    }
+
+    LogWriterUtils.getLogWriter().info("[testFairness] totalLocks=" + 
totalLocks + 
+                        " minLocks=" + minLocks +
+                        " maxLocks=" + maxLocks);
+
+    int expectedLocks = (totalLocks / numThreads) + 1;
+    
+    int deviation = (int)(expectedLocks * 0.3);
+    int lowThreshold = expectedLocks - deviation;
+    int highThreshold = expectedLocks + deviation;
+
+    LogWriterUtils.getLogWriter().info("[testFairness] deviation=" + deviation 
+
+                        " expectedLocks=" + expectedLocks + 
+                        " lowThreshold=" + lowThreshold +
+                        " highThreshold=" + highThreshold);
+                        
+    assertTrue("minLocks is less than lowThreshold",
+               minLocks >= lowThreshold);
+    assertTrue("maxLocks is greater than highThreshold",
+               maxLocks <= highThreshold);
+  }
+  
+  /**
+   * Accessed by reflection.  DO NOT REMOVE
+   * @param i
+   * @return
+   */
+  public static Integer get_count_testFairness(Integer i) {
+    return new Integer(count_testFairness[i.intValue()]);
+  }
+  
+  public void testOneGetsAndOthersTimeOut() throws Exception {
+    doOneGetsAndOthersTimeOut(1, 1);
+//     doOneGetsAndOthersTimeOut(2, 2);
+//     doOneGetsAndOthersTimeOut(3, 2);
+    doOneGetsAndOthersTimeOut(4, 3);
+  }
+  
+  private InternalDistributedMember lockGrantor;
+  private synchronized void 
assertGrantorIsConsistent(InternalDistributedMember id) {
+    if (this.lockGrantor == null) {
+      this.lockGrantor = id;
+    } else {
+      assertEquals("assertGrantorIsConsistent failed", lockGrantor, id);
+    }
+  }
+  
+  /**
+   * Accessed via reflection.  DO NOT REMOVE
+   * @param serviceName
+   * @return
+   */
+  public static InternalDistributedMember identifyLockGrantor(String 
serviceName) {
+    DLockService service = (DLockService)
+        DistributedLockService.getServiceNamed(serviceName);
+    assertNotNull(service);
+    InternalDistributedMember grantor = 
service.getLockGrantorId().getLockGrantorMember();
+    assertNotNull(grantor);
+    logInfo("In identifyLockGrantor - grantor is " + grantor);
+    return grantor;
+  }
+  
+  /**
+   * Accessed via reflection.  DO NOT REMOVE.
+   * @param serviceName
+   * @return
+   */
+  public static Boolean isLockGrantor(String serviceName) {
+    DLockService service = (DLockService)
+        DistributedLockService.getServiceNamed(serviceName);
+    assertNotNull(service);
+    Boolean result = Boolean.valueOf(service.isLockGrantor());
+    logInfo("In isLockGrantor: " + result);
+    return result;
+  }
+
+  /**
+   * Accessed via reflection.  DO NOT REMOVE.
+   * @param serviceName
+   */
+  protected static void becomeLockGrantor(String serviceName) {
+    DLockService service = (DLockService)
+        DistributedLockService.getServiceNamed(serviceName);
+    assertNotNull(service);
+    logInfo("About to call becomeLockGrantor...");
+    service.becomeLockGrantor();
+  }
+  
+  public void testGrantorSelection() {
+    // TODO change distributedCreateService usage to be concurrent threads
+    
+    // bring up 4 members and make sure all identify one as grantor
+    int numVMs = 4;
+    final String serviceName = "testGrantorSelection_" + getUniqueName();
+    distributedCreateService(numVMs, serviceName);
+    try {
+      Thread.sleep(100);
+    }
+    catch (InterruptedException ignore) {fail("interrupted");}
+
+    final Object[] args = new Object[] {serviceName};
+    final Host host = Host.getHost(0);
+    for (int vm=0; vm<numVMs; vm++) {
+      final int finalvm = vm;
+      logInfo("VM " + finalvm + " in " + serviceName + " about to invoke");
+      InternalDistributedMember id = 
(InternalDistributedMember)host.getVM(finalvm).invoke(
+          DistributedLockServiceDUnitTest.class, 
+          "identifyLockGrantor", 
+          args);
+      logInfo("VM " + finalvm + " in " + serviceName + " got " + id);
+      assertGrantorIsConsistent(id);
+    }
+  }
+  
+  public void testBasicGrantorRecovery() {
+    //DLockGrantor.setUncleanDestroyEnabled(true);
+//    try {
+      // 1) start up 4 VM members...
+      int numVMs = 4;
+      final String serviceName = "testBasicGrantorRecovery_" + getUniqueName();
+      distributedCreateService(numVMs, serviceName);
+      try {
+        Thread.sleep(100);
+      }
+      catch (InterruptedException ignore) {fail("interrupted");}
+      
+      final Object[] args = new Object[] {serviceName};
+      final Host host = Host.getHost(0);
+      
+      int originalGrantor = 3;
+      host.getVM(originalGrantor).invoke(
+          DistributedLockServiceDUnitTest.class, "identifyLockGrantor", args);
+            
+      // 2) find the grantor and disconnect him...
+      int originalVM = -1;
+      InternalDistributedMember oldGrantor = null;
+      for (int vm = 0; vm < numVMs; vm++) {
+        final int finalvm = vm;
+        Boolean isGrantor = (Boolean)host.getVM(finalvm).invoke(
+            DistributedLockServiceDUnitTest.class, "isLockGrantor", args);
+        if (isGrantor.booleanValue()) {
+          originalVM = vm;
+          oldGrantor = (InternalDistributedMember)host.getVM(finalvm).invoke(
+            DistributedLockServiceDUnitTest.class, "identifyLockGrantor", 
args);
+          break;
+        }
+      }
+      
+      assertTrue(originalVM == originalGrantor);
+      
+      host.getVM(originalVM).invoke(new SerializableRunnable() {
+        public void run() {
+          disconnectFromDS();
+        }
+      });
+      
+      try {
+        Thread.sleep(100);
+      }
+      catch (InterruptedException ignore) {fail("interrupted");}
+        
+      // 3) verify that another member recovers for grantor
+      int attempts = 3;
+      for (int attempt = 0; attempt < attempts; attempt++) {
+        try {
+          for (int vm = 0; vm < numVMs; vm++) {
+            if (vm == originalVM) continue; // skip because he's disconnected
+            final int finalvm = vm;
+            logInfo("[testBasicGrantorRecovery] VM " + finalvm + 
+                    " in " + serviceName + " about to invoke");
+            InternalDistributedMember id = 
(InternalDistributedMember)host.getVM(finalvm).invoke(
+                DistributedLockServiceDUnitTest.class, 
+                "identifyLockGrantor", 
+                args);
+            logInfo("[testBasicGrantorRecovery] VM " + finalvm + 
+                    " in " + serviceName + " got " + id);
+            assertGrantorIsConsistent(id);
+            logInfo("[testBasicGrantorRecovery] new grantor " + id + 
+                    " is not old grantor " + oldGrantor);
+            assertEquals("New grantor must not equal the old grantor", true,
+                         !id.equals(oldGrantor)); // new grantor != old grantor
+          } // loop thru vms
+          logInfo("[testBasicGrantorRecovery] succeeded attempt " + attempt);
+          break; // success
+        }
+        catch (AssertionFailedError e) {
+          logInfo("[testBasicGrantorRecovery] failed attempt " + attempt);
+          if (attempt == attempts-1) throw e;
+        }
+      } // loop thru attempts
+//    }
+//    finally {
+      //DLockGrantor.setUncleanDestroyEnabled(false);
+//    }
+  }
+  
+  public void testLockFailover() {
+    final int originalGrantorVM = 0;
+    final int oneVM = 1;
+    final int twoVM = 2;
+    final String serviceName = "testLockFailover-" + getUniqueName();
+    
+    // create lock services...
+    LogWriterUtils.getLogWriter().fine("[testLockFailover] create services");
+    
+    Host.getHost(0).getVM(originalGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+    
+    Host.getHost(0).getVM(oneVM).invoke(() -> 
DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+    
+    Host.getHost(0).getVM(twoVM).invoke(() -> 
DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+    
+    Host.getHost(0).getVM(originalGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.identifyLockGrantor( serviceName ));
+        
+    Boolean isGrantor = (Boolean) 
Host.getHost(0).getVM(originalGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.isLockGrantor( serviceName ));
+    assertEquals("First member calling getLockGrantor failed to become 
grantor", 
+                 Boolean.TRUE, isGrantor);
+
+    // get locks...
+    LogWriterUtils.getLogWriter().fine("[testLockFailover] get lock");
+    
+    Boolean locked = (Boolean) 
Host.getHost(0).getVM(originalGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+originalGrantorVM ));
+    assertEquals("Failed to get lock in testLockFailover", 
+                 Boolean.TRUE, locked);
+                 
+    locked = (Boolean) Host.getHost(0).getVM(twoVM).invoke(() -> 
DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+twoVM ));
+    assertEquals("Failed to get lock in testLockFailover", 
+                 Boolean.TRUE, locked);
+                 
+    locked = (Boolean) Host.getHost(0).getVM(oneVM).invoke(() -> 
DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+oneVM ));
+    assertEquals("Failed to get lock in testLockFailover", 
+                 Boolean.TRUE, locked);
+                 
+    // disconnect originalGrantorVM...
+    LogWriterUtils.getLogWriter().fine("[testLockFailover] disconnect 
originalGrantorVM");
+
+    Host.getHost(0).getVM(originalGrantorVM).invoke(new SerializableRunnable() 
{
+      public void run() {
+        disconnectFromDS();
+      }
+    });
+    
+    try {
+      Thread.sleep(100);
+    }
+    catch (InterruptedException ignore) {fail("interrupted");}
+                 
+    // verify locks by unlocking...
+    LogWriterUtils.getLogWriter().fine("[testLockFailover] release locks");
+                 
+    Boolean unlocked = (Boolean) Host.getHost(0).getVM(twoVM).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+twoVM ));
+    assertEquals("Failed to release lock in testLockFailover", 
+                 Boolean.TRUE, unlocked);
+
+    unlocked = (Boolean) Host.getHost(0).getVM(oneVM).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+oneVM ));
+    assertEquals("Failed to release lock in testLockFailover", 
+                 Boolean.TRUE, unlocked);
+
+    // switch locks...
+    locked = (Boolean) Host.getHost(0).getVM(oneVM).invoke(() -> 
DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+twoVM ));
+    assertEquals("Failed to get lock in testLockFailover", 
+                 Boolean.TRUE, locked);
+                 
+    locked = (Boolean) Host.getHost(0).getVM(twoVM).invoke(() -> 
DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+oneVM ));
+    assertEquals("Failed to get lock in testLockFailover", 
+                 Boolean.TRUE, locked);
+                 
+    unlocked = (Boolean) Host.getHost(0).getVM(oneVM).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+twoVM ));
+    assertEquals("Failed to release lock in testLockFailover", 
+                 Boolean.TRUE, unlocked);
+
+    unlocked = (Boolean) Host.getHost(0).getVM(twoVM).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+oneVM ));
+    assertEquals("Failed to release lock in testLockFailover", 
+                 Boolean.TRUE, unlocked);
+                 
+    // verify grantor is unique...
+    LogWriterUtils.getLogWriter().fine("[testLockFailover] verify grantor 
identity");
+    
+    InternalDistributedMember oneID = 
(InternalDistributedMember)Host.getHost(0).getVM(oneVM).invoke(() -> 
DistributedLockServiceDUnitTest.identifyLockGrantor(serviceName));
+    InternalDistributedMember twoID = 
(InternalDistributedMember)Host.getHost(0).getVM(twoVM).invoke(() -> 
DistributedLockServiceDUnitTest.identifyLockGrantor(serviceName));
+    assertTrue("Failed to identifyLockGrantor in testLockFailover",
+               oneID != null && twoID != null);
+    assertEquals("Failed grantor uniqueness in testLockFailover", 
+                 oneID, twoID);
+  }
+  
+  public void testLockThenBecomeLockGrantor() {
+    final int originalGrantorVM = 0;
+    final int becomeGrantorVM = 1;
+    final int thirdPartyVM = 2;
+    final String serviceName = "testLockThenBecomeLockGrantor-" + 
getUniqueName();
+    
+    // create lock services...
+    LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] create 
services");
+    
+    Host.getHost(0).getVM(originalGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+    
+    try {
+      Thread.sleep(20);
+    } catch (InterruptedException ignore) {fail("interrupted");}
+
+    Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+    
+    Host.getHost(0).getVM(thirdPartyVM).invoke(() -> 
DistributedLockServiceDUnitTest.remoteCreateService( serviceName ));
+    
+    Host.getHost(0).getVM(originalGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.identifyLockGrantor( serviceName ));
+        
+    Boolean isGrantor = (Boolean) 
Host.getHost(0).getVM(originalGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.isLockGrantor( serviceName ));
+    assertEquals("First member calling getLockGrantor failed to become 
grantor", 
+                 Boolean.TRUE, isGrantor);
+
+    // control...
+    LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] check 
control");
+    Boolean check = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() 
-> DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+becomeGrantorVM 
));
+    assertEquals("Check of control failed... unlock succeeded but nothing 
locked", 
+                 Boolean.FALSE, check);
+                 
+    // get locks...
+    LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] get 
lock");
+    
+    Boolean locked = (Boolean) 
Host.getHost(0).getVM(originalGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+originalGrantorVM ));
+    assertEquals("Failed to get lock in testLockThenBecomeLockGrantor", 
+                 Boolean.TRUE, locked);
+                 
+    locked = (Boolean) Host.getHost(0).getVM(thirdPartyVM).invoke(() -> 
DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+thirdPartyVM ));
+    assertEquals("Failed to get lock in testLockThenBecomeLockGrantor", 
+                 Boolean.TRUE, locked);
+                 
+    locked = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.lock( serviceName, "KEY-"+becomeGrantorVM ));
+    assertEquals("Failed to get lock in testLockThenBecomeLockGrantor", 
+                 Boolean.TRUE, locked);
+                 
+    // become lock grantor...
+    LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] become 
lock grantor");
+    
+    Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.becomeLockGrantor( serviceName ));
+        
+    try {
+      Thread.sleep(20);
+    } catch (InterruptedException ignore) {fail("interrupted");}
+    
+    isGrantor = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.isLockGrantor( serviceName ));
+    assertEquals("Failed to become lock grantor", 
+                 Boolean.TRUE, isGrantor);    
+
+    // verify locks by unlocking...
+    LogWriterUtils.getLogWriter().fine("[testLockThenBecomeLockGrantor] 
release locks");
+                 
+    Boolean unlocked = (Boolean) 
Host.getHost(0).getVM(originalGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+originalGrantorVM 
));
+    assertEquals("Failed to release lock in testLockThenBecomeLockGrantor", 
+                 Boolean.TRUE, unlocked);
+                 
+    unlocked = (Boolean) Host.getHost(0).getVM(thirdPartyVM).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+thirdPartyVM ));
+    assertEquals("Failed to release lock in testLockThenBecomeLockGrantor", 
+                 Boolean.TRUE, unlocked);
+
+    unlocked = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+becomeGrantorVM ));
+    assertEquals("Failed to release lock in testLockThenBecomeLockGrantor", 
+                 Boolean.TRUE, unlocked);
+
+    // test for bug in which transferred token gets re-entered causing lock 
recursion
+    unlocked = (Boolean) Host.getHost(0).getVM(becomeGrantorVM).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "KEY-"+becomeGrantorVM ));
+    assertEquals("Transfer of tokens caused lock recursion in held lock", 
+                 Boolean.FALSE, unlocked);
+  }
+  
+  public void testBecomeLockGrantor() {
+    // create lock services...
+    int numVMs = 4;
+    final String serviceName = "testBecomeLockGrantor-" + getUniqueName();
+    distributedCreateService(numVMs, serviceName);
+    
+    // each one gets a lock...
+    for (int vm = 0; vm < numVMs; vm++) {
+      final int finalvm = vm;
+      Boolean locked = Host.getHost(0).getVM(finalvm).invoke(() -> 
DistributedLockServiceDUnitTest.lock( serviceName, "obj-"+finalvm ));
+      assertEquals("Failed to get lock in testBecomeLockGrantor", 
+                   Boolean.TRUE, locked);
+    }
+    
+    // find the grantor...
+    final Object[] args = new Object[] {serviceName};
+    int originalVM = -1;
+    InternalDistributedMember oldGrantor = null;
+    for (int vm = 0; vm < numVMs; vm++) {
+      final int finalvm = vm;
+      Boolean isGrantor = (Boolean)Host.getHost(0).getVM(finalvm).invoke(
+          DistributedLockServiceDUnitTest.class, "isLockGrantor", args);
+      if (isGrantor.booleanValue()) {
+        originalVM = vm;
+        oldGrantor = 
(InternalDistributedMember)Host.getHost(0).getVM(finalvm).invoke(
+          DistributedLockServiceDUnitTest.class, "identifyLockGrantor", args);
+        break;
+      }
+    }
+    
+    LogWriterUtils.getLogWriter().fine("[testBecomeLockGrantor] original 
grantor is " + oldGrantor);
+    
+    // have one call becomeLockGrantor
+    for (int vm = 0; vm < numVMs; vm++) {
+      if (vm != originalVM) {
+        final int finalvm = vm;
+        Host.getHost(0).getVM(finalvm).invoke(
+            DistributedLockServiceDUnitTest.class, "becomeLockGrantor", args);
+        Boolean isGrantor = (Boolean)Host.getHost(0).getVM(finalvm).invoke(
+            DistributedLockServiceDUnitTest.class, "isLockGrantor", args);
+        assertEquals("isLockGrantor is false after calling becomeLockGrantor", 
+                     Boolean.TRUE, isGrantor);
+        break;
+      }
+    }
+    
+    LogWriterUtils.getLogWriter().fine("[testBecomeLockGrantor] one vm has 
called becomeLockGrantor...");
+    
+    InternalDistributedMember newGrantor = null;
+    for (int vm = 0; vm < numVMs; vm++) {
+      final int finalvm = vm;
+      Boolean isGrantor = (Boolean)Host.getHost(0).getVM(finalvm).invoke(
+          DistributedLockServiceDUnitTest.class, "isLockGrantor", args);
+      if (isGrantor.booleanValue()) {
+        newGrantor = 
(InternalDistributedMember)Host.getHost(0).getVM(finalvm).invoke(
+          DistributedLockServiceDUnitTest.class, "identifyLockGrantor", args);
+        break;
+      }
+    }
+    LogWriterUtils.getLogWriter().fine("[testBecomeLockGrantor] new Grantor is 
" + newGrantor);
+    assertEquals(false, newGrantor.equals(oldGrantor));
+    
+    // verify locks still held by unlocking
+    // each one unlocks...
+    for (int vm = 0; vm < numVMs; vm++) {
+      final int finalvm = vm;
+      Boolean unlocked = (Boolean) Host.getHost(0).getVM(finalvm).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "obj-"+finalvm ));
+      assertEquals("Failed to unlock in testBecomeLockGrantor", 
+                   Boolean.TRUE, unlocked);
+    }
+    
+    LogWriterUtils.getLogWriter().fine("[testBecomeLockGrantor] finished");
+    
+    // verify that pending requests are granted by unlocking them also
+  }
+
+  public void testTryLock() {
+    final Long waitMillis = new Long(100);
+    
+    // create lock services...
+    LogWriterUtils.getLogWriter().fine("[testTryLock] create lock services");
+    final String serviceName = "testTryLock-" + getUniqueName();
+    distributedCreateService(4, serviceName);
+    
+    // all 4 vms scramble to get tryLock but only one should succeed...
+    LogWriterUtils.getLogWriter().fine("[testTryLock] attempt to get tryLock");
+    int lockCount = 0;
+    for (int vm = 0; vm < 4; vm++) {
+      final int finalvm = vm;
+      Boolean locked = (Boolean) Host.getHost(0).getVM(finalvm).invoke(() -> 
DistributedLockServiceDUnitTest.tryLock( serviceName, "KEY", waitMillis ));
+      if (locked.booleanValue()) lockCount++;
+    }
+
+    assertEquals("More than one vm acquired the tryLock", 
+                 1, lockCount);    
+
+    LogWriterUtils.getLogWriter().fine("[testTryLock] unlock tryLock");
+    int unlockCount = 0;
+    for (int vm = 0; vm < 4; vm++) {
+      final int finalvm = vm;
+      Boolean unlocked = (Boolean) Host.getHost(0).getVM(finalvm).invoke(() -> 
DistributedLockServiceDUnitTest.unlock( serviceName, "KEY" ));
+      if (unlocked.booleanValue()) unlockCount++;
+    }
+
+    assertEquals("More than one vm unlocked the tryLock", 
+                 1, unlockCount);    
+  }
+  
+  public void testOneGetsThenOtherGets()  throws Exception  { // (numVMs, 
numThreadsPerVM)
+    doOneGetsThenOtherGets(1, 1);
+//     doOneGetsThenOtherGets(2, 2);
+//     doOneGetsThenOtherGets(3, 3);
+    doOneGetsThenOtherGets(4, 3);
+  }
+  
+  public void testLockDifferentNames() throws Exception  {
+    String serviceName = getUniqueName();
+
+    // Same VM
+    remoteCreateService(serviceName);
+    DistributedLockService service = 
DistributedLockService.getServiceNamed(serviceName);
+    assertTrue(service.lock("obj1", -1, -1));
+    assertTrue(service.lock("obj2", -1, -1));
+    service.unlock("obj1");
+    service.unlock("obj2");
+    
+    // Different VMs
+    VM vm = Host.getHost(0).getVM(0);
+    vm.invoke(() -> this.remoteCreateService(serviceName));
+    assertTrue(service.lock("masterVMobj", -1, -1));
+    
+    assertEquals(Boolean.TRUE, vm.invoke(() -> this.getLockAndIncrement(
+        serviceName, "otherVMobj", new Integer(-1), new Integer(0)
+      )));
+    
+    service.unlock("masterVMobj");
+  }
+  
+  public void testLocalGetLockAndIncrement()  throws Exception {
+    String serviceName = getUniqueName();
+    remoteCreateService(serviceName);
+    DistributedLockService.getServiceNamed(serviceName);
+    assertEquals(Boolean.TRUE, 
+                 getLockAndIncrement(serviceName, "localVMobj", -1, 0));
+  }
+  public void testRemoteGetLockAndIncrement() {
+    String serviceName = getUniqueName();
+    VM vm = Host.getHost(0).getVM(0);
+    vm.invoke(() -> this.remoteCreateService(serviceName));
+    assertEquals(Boolean.TRUE, vm.invoke(() -> this.getLockAndIncrement(
+        serviceName, "remoteVMobj", new Integer(-1), new Integer(0)
+      )));
+  }
+  
+  public void testLockSameNameDifferentService() 
+  {
+    String serviceName1 = getUniqueName() + "_1";
+    String serviceName2 = getUniqueName() + "_2";
+    String objName = "obj";
+
+    // Same VM
+    remoteCreateService(serviceName1);
+    remoteCreateService(serviceName2);
+    DistributedLockService service1 = 
DistributedLockService.getServiceNamed(serviceName1);
+    DistributedLockService service2 = 
DistributedLockService.getServiceNamed(serviceName2);
+    assertTrue(service1.lock(objName, -1, -1));
+    assertTrue(service2.lock(objName, -1, -1));
+    service1.unlock(objName);
+    service2.unlock(objName);
+    
+    // Different VMs
+    VM vm = Host.getHost(0).getVM(0);
+    vm.invoke(() -> this.remoteCreateService(serviceName1));
+    vm.invoke(() -> this.remoteCreateService(serviceName2));
+    assertTrue(service1.lock(objName, -1, -1));
+    assertEquals(Boolean.TRUE, vm.invoke(() -> this.getLockAndIncrement(
+        serviceName2, objName, new Integer(-1), new Integer(0)
+      )));
+    service1.unlock(objName);
+  }
+  
+  public void testLeaseDoesntExpire()
+  throws InterruptedException
+  {
+    String serviceName = getUniqueName();
+    final Object objName = new Integer(3);
+
+    // Same VM
+    remoteCreateService(serviceName);
+    final DistributedLockService service = 
DistributedLockService.getServiceNamed(serviceName);
+    // lock objName with a sufficiently long lease
+    assertTrue(service.lock(objName, -1, 60000));
+    // try to lock in another thread, with a timeout shorter than above lease
+    final boolean[] resultHolder = new boolean[] { false };
+    Thread thread = new Thread(new Runnable() {
+      public void run() {
+        resultHolder[0] = !service.lock(objName, 1000, -1);
+      }
+    });
+    thread.start();
+    ThreadUtils.join(thread, 30 * 1000);
+    assertTrue(resultHolder[0]);
+    // the unlock should succeed without throwing LeaseExpiredException
+    service.unlock(objName);
+    
+    // Different VM
+    VM vm = Host.getHost(0).getVM(0);
+    vm.invoke(() -> this.remoteCreateService(serviceName));
+    // lock objName in this VM with a sufficiently long lease
+    assertTrue(service.lock(objName, -1, 60000));
+    // try to lock in another VM, with a timeout shorter than above lease
+    assertEquals(Boolean.FALSE, vm.invoke(() -> this.getLockAndIncrement(
+        serviceName, objName, new Long(1000), new Long(0)
+      )));
+    // the unlock should succeed without throwing LeaseExpiredException
+    service.unlock(objName);
+  }
+
+  public void testLockUnlock() {
+    String serviceName = getUniqueName();
+    Object objName = new Integer(42);
+    
+    remoteCreateService(serviceName);
+    DistributedLockService service = 
+        DistributedLockService.getServiceNamed(serviceName);
+
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+    
+    service.lock(objName, -1, -1);
+    Assert.assertTrue(service.isHeldByCurrentThread(objName));
+    
+    service.unlock(objName);
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+  }
+  
+  public void testLockExpireUnlock() {
+    long leaseMs = 200;
+    long waitBeforeLockingMs = 210;
+    
+    String serviceName = getUniqueName();
+    Object objName = new Integer(42);
+    
+    remoteCreateService(serviceName);
+    DistributedLockService service = 
+        DistributedLockService.getServiceNamed(serviceName);
+
+    assertTrue(!service.isHeldByCurrentThread(objName));
+    
+    assertTrue(service.lock(objName, -1, leaseMs));
+    assertTrue(service.isHeldByCurrentThread(objName));
+    
+    sleep(waitBeforeLockingMs); // should expire...
+    assertTrue(!service.isHeldByCurrentThread(objName));
+   
+    try {
+      service.unlock(objName);
+      fail("unlock should have thrown LeaseExpiredException");
+    } catch (LeaseExpiredException ex) {
+    }
+  }
+  
+  public void testLockRecursion() {
+    String serviceName = getUniqueName();
+    Object objName = new Integer(42);
+    
+    remoteCreateService(serviceName);
+    DistributedLockService service = 
+        DistributedLockService.getServiceNamed(serviceName);
+
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+    
+    // initial lock...
+    Assert.assertTrue(service.lock(objName, -1, -1));
+    Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+    // recursion +1...
+    Assert.assertTrue(service.lock(objName, -1, -1));
+
+    // recursion -1...    
+    service.unlock(objName);
+    Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+    // and unlock...    
+    service.unlock(objName);
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+  }
+  
+  public void testLockRecursionWithExpiration() {
+    long leaseMs = 500;
+    long waitBeforeLockingMs = 750;
+    
+    String serviceName = getUniqueName();
+    Object objName = new Integer(42);
+    
+    remoteCreateService(serviceName);
+    DistributedLockService service = 
+        DistributedLockService.getServiceNamed(serviceName);
+
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+    
+    // initial lock...
+    Assert.assertTrue(service.lock(objName, -1, leaseMs));
+    Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+    // recursion +1...
+    Assert.assertTrue(service.lock(objName, -1, leaseMs));
+    Assert.assertTrue(service.isHeldByCurrentThread(objName));
+    
+    // expire...
+    sleep(waitBeforeLockingMs);
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+
+    // should fail...
+    try {
+      service.unlock(objName);
+      fail("unlock should have thrown LeaseExpiredException");
+    } catch (LeaseExpiredException ex) {
+    }
+    
+    // relock it...
+    Assert.assertTrue(service.lock(objName, -1, leaseMs));
+    Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+    // and unlock to verify no recursion...    
+    service.unlock(objName);
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName)); // throws 
failure!!
+    
+    // go thru again in different order...
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+    
+    // initial lock...
+    Assert.assertTrue(service.lock(objName, -1, leaseMs));
+    Assert.assertTrue(service.isHeldByCurrentThread(objName));
+
+    // expire...
+    sleep(waitBeforeLockingMs);
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+
+    // relock it...
+    Assert.assertTrue(service.lock(objName, -1, leaseMs));
+    Assert.assertTrue(service.isHeldByCurrentThread(objName));
+    
+    // and unlock to verify no recursion...    
+    service.unlock(objName);
+    Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+  }
+  
+  public void testLeaseExpiresBeforeOtherLocks()
+  throws InterruptedException 
+  {
+    leaseExpiresTest(false);
+  }
+  
+  public void testLeaseExpiresWhileOtherLocks()
+  throws InterruptedException 
+  {
+    leaseExpiresTest(true);
+  }
+  
+  private void leaseExpiresTest(boolean tryToLockBeforeExpiration)
+  throws InterruptedException
+  {
+    LogWriterUtils.getLogWriter().fine("[testLeaseExpires] prepping");
+    long leaseMs = 100;
+    long waitBeforeLockingMs = tryToLockBeforeExpiration ? 50 : 110;
+
+    final String serviceName = getUniqueName();
+    final Object objName = new Integer(3);
+
+    // Same VM
+    remoteCreateService(serviceName);
+    final DistributedLockService service = 
DistributedLockService.getServiceNamed(serviceName);
+    
+    LogWriterUtils.getLogWriter().fine("[testLeaseExpires] acquire first 
lock");
+    // lock objName with a short lease
+    assertTrue(service.lock(objName, -1, leaseMs));
+    sleep(waitBeforeLockingMs);
+
+    if (waitBeforeLockingMs > leaseMs) {
+      Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+    }
+    
+    LogWriterUtils.getLogWriter().fine("[testLeaseExpires] acquire lock that 
expired");
+    // try to lock in another thread - lease should have expired
+    final boolean[] resultHolder = new boolean[] { false };
+    Thread thread = new Thread(new Runnable() {
+      public void run() {
+        resultHolder[0] = service.lock(objName, -1, -1);
+        service.unlock(objName);
+        Assert.assertTrue(!service.isHeldByCurrentThread(objName));
+      }
+    });
+    thread.start();
+    ThreadUtils.join(thread, 30 * 1000);
+    assertTrue(resultHolder[0]);
+    
+    LogWriterUtils.getLogWriter().fine("[testLeaseExpires] unlock should throw 
LeaseExpiredException");
+    // this thread's unlock should throw LeaseExpiredException
+    try {
+      service.unlock(objName);
+      fail("unlock should have thrown LeaseExpiredException");
+    } catch (LeaseExpiredException ex) {
+    }
+    
+    LogWriterUtils.getLogWriter().fine("[testLeaseExpires] create service in 
other vm");
+    // Different VM
+    VM vm = Host.getHost(0).getVM(0);
+    vm.invoke(() -> this.remoteCreateService(serviceName));
+    
+    LogWriterUtils.getLogWriter().fine("[testLeaseExpires] acquire lock again 
and expire");
+    // lock objName in this VM with a short lease
+    assertTrue(service.lock(objName, -1, leaseMs));
+    sleep(waitBeforeLockingMs);
+    
+    LogWriterUtils.getLogWriter().fine("[testLeaseExpires] succeed lock in 
other vm");
+    // try to lock in another VM - should succeed
+    assertEquals(Boolean.TRUE, vm.invoke(() -> this.getLockAndIncrement(
+        serviceName, objName, new Long(-1), new Long(0)
+      )));
+      
+    LogWriterUtils.getLogWriter().fine("[testLeaseExpires] unlock should throw 
LeaseExpiredException again");
+    // this VMs unlock should throw LeaseExpiredException
+    try {
+      service.unlock(objName);
+      fail("unlock should have thrown LeaseExpiredException");
+    } catch (LeaseExpiredException ex) {
+    }
+  }
+
+  public void testSuspendLockingAfterExpiration() throws Exception {
+    LogWriterUtils.getLogWriter().fine("[leaseExpiresThenSuspendTest]");
+    
+    final long leaseMillis = 100;
+    final long suspendWaitMillis = 10000;
+
+    final String serviceName = getUniqueName();
+    final Object key = new Integer(3);
+
+    // controller locks key and then expires - controller is grantor
+    
+    DistributedLockService dls 
+        = DistributedLockService.create(serviceName, getSystem());
+    
+    assertTrue(dls.lock(key, -1, leaseMillis));
+    
+    // wait for expiration
+    sleep(leaseMillis*2);
+    
+    LogWriterUtils.getLogWriter().fine("[leaseExpiresThenSuspendTest] unlock 
should throw LeaseExpiredException");
+    // this thread's unlock should throw LeaseExpiredException
+    try {
+      dls.unlock(key);
+      fail("unlock should have thrown LeaseExpiredException");
+    } catch (LeaseExpiredException ex) {
+    }
+    
+    // other vm calls suspend 
+    
+    LogWriterUtils.getLogWriter().fine("[leaseExpiresThenSuspendTest] call to 
suspend locking");
+    Host.getHost(0).getVM(0).invoke(new SerializableRunnable() {
+      public void run() {
+        final DistributedLockService dlock 
+            = DistributedLockService.create(serviceName, getSystem());
+        dlock.suspendLocking(suspendWaitMillis);
+        dlock.resumeLocking();
+        assertTrue(dlock.lock(key, -1, leaseMillis));
+        dlock.unlock(key);
+      }
+    });
+  }
+  
+  volatile boolean started = false;
+  volatile boolean gotLock = false;
+  volatile Throwable exception = null;
+  volatile Throwable throwable = null;
+
+  public void testLockInterruptiblyIsInterruptible() {
+    started = false;
+    gotLock = false;
+    exception = null;
+    throwable = null;
+
+    // Lock entire service in first thread
+    LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] 
get and hold the lock");
+    final String serviceName = getUniqueName();
+    final DistributedLockService service = 
+      DistributedLockService.create(serviceName, dlstSystem);
+    service.becomeLockGrantor();
+    assertTrue(service.lock("obj", 1000, -1));
+
+    // Start second thread that tries to lock in second thread
+    LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] 
call lockInterruptibly");
+    Thread thread2 = new Thread(new Runnable() {
+      public void run() {
+        try {
+          started = true;
+          gotLock = service.lockInterruptibly("obj", -1, -1);
+        } catch (InterruptedException ex) {
+          exception = ex;
+        } 
+        catch (VirtualMachineError e) {
+          SystemFailure.initiateFailure(e);
+          throw e;
+        }
+        catch (Throwable t) {
+          throwable = t;
+        }
+      }
+    });
+    thread2.start();
+    
+    // Interrupt second thread
+    LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] 
interrupt calling thread");
+    while (!started) Thread.yield();
+    thread2.interrupt();
+    ThreadUtils.join(thread2, 20 * 1000);
+
+    // Expect it got InterruptedException and didn't lock the service
+    LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] 
verify failed to get lock");
+    assertFalse(gotLock);
+    if (throwable != null) {
+      LogWriterUtils.getLogWriter().warning(
+        "testLockInterruptiblyIsInterruptible threw unexpected Throwable", 
+        throwable);
+    }
+    assertNotNull(exception);
+
+    // Unlock "obj" in first thread
+    LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] 
unlock the lock");
+    service.unlock("obj");
+
+    // Make sure it didn't get locked by second thread
+    LogWriterUtils.getLogWriter().info("[testLockInterruptiblyIsInterruptible] 
try to get lock with timeout should not fail");
+    assertTrue(service.lock("obj", 5000, -1));
+    DistributedLockService.destroy(serviceName);
+  }
+  
+  volatile boolean wasFlagSet = false;
+
+  public void testLockIsNotInterruptible() {
+    // Lock entire service in first thread
+    LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] lock in 
first thread");
+    started = false;
+    gotLock = false;
+    exception = null;
+    wasFlagSet = false;
+    
+    final String serviceName = getUniqueName();
+    final DistributedLockService service = 
+      DistributedLockService.create(serviceName, dlstSystem);
+    assertTrue(service.lock("obj", 1000, -1));
+
+    // Start second thread that tries to lock in second thread
+    LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] attempt 
lock in second thread");
+    Thread thread2 = new Thread(new Runnable() {
+      public void run() {
+        try {
+          started = true;
+          gotLock = service.lock("obj", -1, -1);
+          LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] 
thread2 finished lock() - got " + gotLock);
+        } 
+        catch (VirtualMachineError e) {
+          SystemFailure.initiateFailure(e);
+          throw e;
+        }
+        catch (Throwable ex) {
+          LogWriterUtils.getLogWriter().warning("[testLockIsNotInterruptible] 
Caught...", ex);
+          exception = ex;
+        }
+        wasFlagSet = Thread.currentThread().isInterrupted();
+      }
+    });
+    thread2.start();
+    
+    // Interrupt second thread
+    LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] interrupt 
second thread");
+    while (!started) Thread.yield();
+    sleep(500);
+    thread2.interrupt();
+    // Expect it didn't get an exception and didn't lock the service
+    sleep(500);
+    assertFalse(gotLock);
+    assertNull(exception);
+
+    // Unlock entire service in first thread
+    LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] unlock in 
first thread");
+    service.unlock("obj");
+    sleep(500);
+
+    // Expect that thread2 should now complete execution.
+    ThreadUtils.join(thread2, 20 * 1000);
+
+    // Now thread2 should have gotten the lock, not the exception, but the
+    // thread's flag should be set
+    LogWriterUtils.getLogWriter().fine("[testLockIsNotInterruptible] verify 
second thread got lock");
+    assertNull(exception);
+    assertTrue(gotLock);
+    assertTrue(wasFlagSet);
+  }
+  
+  /**
+   * Test DistributedLockService.acquireExclusiveLocking(), 
releaseExclusiveLocking()
+   */
+  public void testSuspendLockingBasic() 
+  throws InterruptedException {
+    final DistributedLockService service = 
+      DistributedLockService.create(getUniqueName(), dlstSystem);
+
+    try {
+      service.resumeLocking();
+      fail("Didn't throw LockNotHeldException");
+    } catch (LockNotHeldException ex) {
+      // expected
+    }
+
+    assertTrue(service.suspendLocking(-1));
+    service.resumeLocking();
+
+    // It's not reentrant
+    assertTrue(service.suspendLocking(1000));
+    try {
+      service.suspendLocking(1);
+      fail("didn't get IllegalStateException");
+    } catch (IllegalStateException ex) {
+      // expected
+    }
+    service.resumeLocking();
+    
+    // Get "false" if another thread is holding it
+    Thread thread = new Thread(new Runnable() {
+      public void run() {
+        logInfo("new thread about to suspendLocking()");
+        assertTrue(service.suspendLocking(1000));
+      }
+    });
+    thread.start();
+    ThreadUtils.join(thread, 30 * 1000);
+    logInfo("main thread about to suspendLocking");
+    assertTrue(!service.suspendLocking(1000));
+  }
+
+  /**
+   * Test that exlusive locking prohibits locking activity
+   */
+  public void testSuspendLockingProhibitsLocking() 
+  {
+    final String name = getUniqueName();
+    distributedCreateService(2, name);
+    DistributedLockService service = 
+      DistributedLockService.getServiceNamed(name);
+    
+    // Should be able to lock from other VM
+    VM vm1 = Host.getHost(0).getVM(1);
+    assertTrue(vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock( 
name )));
+
+    assertTrue(service.suspendLocking(1000));
+    
+    // vm1 is the grantor... use debugHandleSuspendTimeouts
+    vm1.invoke(new SerializableRunnable("setDebugHandleSuspendTimeouts") {
+      public void run() {
+        DLockService dls = 
+          (DLockService) DistributedLockService.getServiceNamed(name);
+        assertTrue(dls.isLockGrantor());
+        DLockGrantor grantor = dls.getGrantorWithNoSync();
+        grantor.setDebugHandleSuspendTimeouts(5000);
+      }
+    });
+    
+    // Shouldn't be able to lock a name from another VM
+    assertTrue(!vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock( 
name )));
+    
+    service.resumeLocking();
+
+    vm1.invoke(new SerializableRunnable("unsetDebugHandleSuspendTimeouts") {
+      public void run() {
+        DLockService dls = 
+          (DLockService) DistributedLockService.getServiceNamed(name);
+        assertTrue(dls.isLockGrantor());
+        DLockGrantor grantor = dls.getGrantorWithNoSync();
+        grantor.setDebugHandleSuspendTimeouts(0);
+      }
+    });
+    
+    // Should be able to lock again
+    assertTrue(vm1.invoke(() -> DistributedLockServiceDUnitTest.tryToLock( 
name )));
+    
+  }
+
+  /**
+   * Test that suspend locking behaves under various usage patterns. This
+   * ensures that suspend and regular locks behave as ReadWriteLocks and
+   * processing occurs in order.
+   */
+  public void notestSuspendLockingBehaves() throws Exception {
+    try {
+      doTestSuspendLockingBehaves();
+    }
+    finally {
+      Invoke.invokeInEveryVM(new SerializableRunnable() {
+        public void run() {
+          try {
+            if (suspendClientSuspendLockingBehaves != null) {
+              suspendClientSuspendLockingBehaves.stop();
+              suspendClientSuspendLockingBehaves = null;
+            }
+          }
+          catch (VirtualMachineError e) {
+            SystemFailure.initiateFailure(e);
+            throw e;
+          }
+          catch (Throwable t) {
+            LogWriterUtils.getLogWriter().error("Error in 
testSuspendLockingBehaves finally", t);
+          }
+          try {
+            if (lockClientSuspendLockingBehaves != null) {
+              lockClientSuspendLockingBehaves.stop();
+              lockClientSuspendLockingBehaves = null;
+            }
+          }
+          catch (VirtualMachineError e) {
+            SystemFailure.initiateFailure(e);
+            throw e;
+          }
+          catch (Throwable t) {
+            LogWriterUtils.getLogWriter().error("Error in 
testSuspendLockingBehaves finally", t);
+          }
+        }
+      });
+    }
+  }
+  private void doTestSuspendLockingBehaves() throws Exception {
+    final String dlsName = getUniqueName();
+    final VM vmGrantor = Host.getHost(0).getVM(0);
+    final VM vmOne = Host.getHost(0).getVM(1);
+    final VM vmTwo = Host.getHost(0).getVM(2);
+    final VM vmThree = Host.getHost(0).getVM(3);
+    final String key1 = "key1";
+    
+    // TODO: make sure suspend thread can get other locks
+    
+    // TODO: test local (in grantor) locks and suspends also
+    
+    // define some SerializableRunnables
+    final SerializableRunnable createDLS = 
+    new SerializableRunnable("Create "+dlsName) {
+      public void run() {
+        DistributedLockService.create(dlsName, getSystem());
+        lockClientSuspendLockingBehaves = new BasicLockClient(dlsName, key1);
+        suspendClientSuspendLockingBehaves = new BasicLockClient(dlsName, 
key1);
+        assertFalse(isLockGrantor(dlsName).booleanValue());
+      }
+    };
+    final SerializableRunnable suspendLocking = 
+    new SerializableRunnable("Suspend locking "+dlsName) {
+      public void run() {
+        suspendClientSuspendLockingBehaves.suspend();
+      }
+    };
+    final SerializableRunnable resumeLocking = 
+    new SerializableRunnable("Resume locking "+dlsName) {
+      public void run() {
+        suspendClientSuspendLockingBehaves.resume();
+      }
+    };
+    final SerializableRunnable lockKey = 
+    new SerializableRunnable("Get lock "+dlsName) {
+      public void run() {
+        lockClientSuspendLockingBehaves.lock();
+      }
+    };
+    final SerializableRunnable unlockKey = 
+    new SerializableRunnable("Unlock "+dlsName) {
+      public void run() {
+        lockClientSuspendLockingBehaves.unlock();
+      }
+    };
+    
+    // create grantor
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] Create 
grantor "+dlsName);
+    vmGrantor.invoke(new SerializableRunnable("Create grantor "+dlsName) {
+      public void run() {
+        DistributedLockService.create(dlsName, getSystem());
+        DistributedLockService.getServiceNamed(dlsName).lock(key1, -1, -1);
+        DistributedLockService.getServiceNamed(dlsName).unlock(key1);
+        assertTrue(isLockGrantor(dlsName).booleanValue());
+      }
+    });
+
+    // create dls in other vms
+//    getLogWriter().info("[testSuspendLockingBehaves] Create DLS in vmOne");
+    vmOne.invoke(createDLS);
+//    getLogWriter().info("[testSuspendLockingBehaves] Create DLS in vmTwo");
+    vmTwo.invoke(createDLS);
+//    getLogWriter().info("[testSuspendLockingBehaves] Create DLS in vmThree");
+    vmThree.invoke(createDLS);
+    
+    // get a lock
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] line up 
vms for lock");
+//    getLogWriter().info("[testSuspendLockingBehaves] vmOne lock");
+    vmOne.invoke(lockKey);
+//    getLogWriter().info("[testSuspendLockingBehaves] start vmTwoLocking");
+    AsyncInvocation vmTwoLocking =  vmTwo.invokeAsync(lockKey);
+    Wait.pause(2000); // make sure vmTwo is first in line
+//    getLogWriter().info("[testSuspendLockingBehaves] start vmThreeLocking");
+    AsyncInvocation vmThreeLocking = vmThree.invokeAsync(lockKey);
+    Wait.pause(2000);
+    
+    // make sure vmTwo and vmThree are still waiting for lock on key1
+//    getLogWriter().info("[testSuspendLockingBehaves] assert vmTwoLocking 
still alive");
+    Wait.pause(100);
+    assertTrue(vmTwoLocking.isAlive());
+//    getLogWriter().info("[testSuspendLockingBehaves] assert vmThreeLocking 
still alive");
+    Wait.pause(100);
+    assertTrue(vmThreeLocking.isAlive());
+    
+    // let vmTwo get key
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] unlock so 
vmTwo can get key");
+//    getLogWriter().info("[testSuspendLockingBehaves] vmOne unlock");
+    vmOne.invoke(unlockKey);
+    ThreadUtils.join(vmTwoLocking, 10 * 1000);
+
+    // start suspending in vmOne and vmTwo
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] start 
suspending requests");
+//    getLogWriter().info("[testSuspendLockingBehaves] start vmOneSuspending");
+    AsyncInvocation vmOneSuspending = vmOne.invokeAsync(suspendLocking);
+    Wait.pause(2000); // make sure vmOne is first in line
+//    getLogWriter().info("[testSuspendLockingBehaves] start vmTwoSuspending");
+    AsyncInvocation vmTwoSuspending  = vmTwo.invokeAsync(suspendLocking);
+    Wait.pause(2000); 
+    
+    // let vmThree finish locking key
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] unlock so 
vmThree can get key");
+//    getLogWriter().info("[testSuspendLockingBehaves] vmTwo unlock");
+    vmTwo.invoke(unlockKey);
+    ThreadUtils.join(vmThreeLocking, 10 * 1000);
+    
+    // have vmOne get back in line for locking key
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] start 
another lock request");
+//    getLogWriter().info("[testSuspendLockingBehaves] start 
vmOneLockingAgain");
+    AsyncInvocation vmOneLockingAgain = vmOne.invokeAsync(lockKey);
+    Wait.pause(2000); 
+    
+    // let vmOne suspend locking
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] let vmOne 
suspend locking");
+//    getLogWriter().info("[testSuspendLockingBehaves] assert vmOneSuspending 
still alive");
+    Wait.pause(100);
+    assertTrue(vmOneSuspending.isAlive());
+//    getLogWriter().info("[testSuspendLockingBehaves] vmThree unlock");
+    vmThree.invoke(unlockKey);
+    ThreadUtils.join(vmOneSuspending, 10 * 1000);
+    
+    // start suspending in vmThree
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] line up 
vmThree for suspending");
+//    getLogWriter().info("[testSuspendLockingBehaves] start 
vmThreeSuspending");
+    AsyncInvocation vmThreeSuspending  = vmThree.invokeAsync(suspendLocking);
+    Wait.pause(2000); 
+
+    // let vmTwo suspend locking
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] let vmTwo 
suspend locking");
+//    getLogWriter().info("[testSuspendLockingBehaves] assert vmTwoSuspending 
still alive");
+    Wait.pause(100);
+    assertTrue(vmTwoSuspending.isAlive());
+//    getLogWriter().info("[testSuspendLockingBehaves] vmOne resumes locking");
+    vmOne.invoke(resumeLocking);
+    ThreadUtils.join(vmTwoSuspending, 10 * 1000);
+    
+    // let vmOne get that lock
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] let vmOne 
get that lock");
+//    getLogWriter().info("[testSuspendLockingBehaves] assert 
vmOneLockingAgain still alive");
+    Wait.pause(100);
+    assertTrue(vmOneLockingAgain.isAlive());
+//    getLogWriter().info("[testSuspendLockingBehaves] vmTwo resumes locking");
+    vmTwo.invoke(resumeLocking);
+    ThreadUtils.join(vmOneLockingAgain, 10 * 1000);
+    
+    // let vmThree suspend locking
+    LogWriterUtils.getLogWriter().info("[testSuspendLockingBehaves] let 
vmThree suspend locking");
+//    getLogWriter().info("[testSuspendLockingBehaves] assert 
vmThreeSuspending still alive");
+    Wait.pause(100);
+    assertTrue(vmThreeSuspending.isAlive());
+//    getLogWriter().info("[testSuspendLockingBehaves] vmOne unlocks again");
+    vmOne.invoke(unlockKey);
+    ThreadUtils.join(vmThreeSuspending, 10 * 1000);
+
+    // done
+//    getLogWriter().info("[testSuspendLockingBehaves] vmThree resumes 
locking");
+    vmThree.invoke(resumeLocking);
+  }
+  protected static BasicLockClient suspendClientSuspendLockingBehaves;
+  protected static BasicLockClient lockClientSuspendLockingBehaves;
+  
+  /**
+   * Test that exlusive locking prohibits locking activity
+   */
+  public void testSuspendLockingBlocksUntilNoLocks() 
+  throws InterruptedException
+  {
+
+    final String name = getUniqueName();
+    distributedCreateService(2, name);
+    final DistributedLockService service = 
+      DistributedLockService.getServiceNamed(name);
+    
+    // Get lock from other VM.  Since same thread needs to lock and unlock,
+    // invoke asynchronously, get lock, wait to be notified, then unlock.
+    VM vm1 = Host.getHost(0).getVM(1);
+    vm1.invokeAsync(new SerializableRunnable("Lock & unlock in vm1") {
+      public void run() {
+        DistributedLockService service2 = 
+          DistributedLockService.getServiceNamed(name);
+        assertTrue(service2.lock("lock", -1, -1));
+        synchronized (monitor) {
+          try {
+            monitor.wait();
+          } catch (InterruptedException ex) {
+            System.out.println("Unexpected InterruptedException");
+            fail("interrupted");
+          }
+        }
+        service2.unlock("lock");
+      }
+    });
+    // Let vm1's thread get the lock and go into wait()
+    Thread.sleep(100);
+    
+    Thread thread = new Thread(new Runnable() {
+      public void run() {
+        setGot(service.suspendLocking(-1));
+        setDone(true);
+        service.resumeLocking();
+      }
+    });
+    setGot(false);
+    setDone(false);
+    thread.start();
+    
+    // Let thread start, make sure it's blocked in suspendLocking
+    Thread.sleep(100);
+    assertFalse("Before release, got: " + getGot() + ", done: " + getDone(), 
getGot() || getDone());
+    
+    vm1.invoke(new SerializableRunnable("notify vm1 to unlock") {
+      public void run() {
+        synchronized (monitor) {
+          monitor.notify();
+        }
+      }
+    });
+
+    // Let thread finish, make sure it successfully suspended and is done
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return getDone();
+      }
+      public String description() {
+        return null;
+      }
+    };
+    Wait.waitForCriterion(ev, 30 * 1000, 200, true);
+    if (!getGot() || !getDone()) {
+      ThreadUtils.dumpAllStacks();
+    }
+    assertTrue("After release, got: " + getGot() + ", done: " + getDone(), 
getGot() && getDone());
+    
+  }
+  
+  public void testSuspendLockingInterruptiblyIsInterruptible() {
+    
+    started = false;
+    gotLock = false;
+    exception = null;
+
+    // Lock entire service in first thread
+    final String name = getUniqueName();
+    final DistributedLockService service = 
+      DistributedLockService.create(name, dlstSystem);
+    assertTrue(service.suspendLocking(1000));
+
+    // Start second thread that tries to lock in second thread
+    Thread thread2 = new Thread(new Runnable() {
+      public void run() {
+        try {
+          started = true;
+          gotLock = service.suspendLockingInterruptibly(-1);
+        } catch (InterruptedException ex) {
+          exception = ex;
+        }
+      }
+    });
+    thread2.start();
+    
+    // Interrupt second thread
+    while (!started) Thread.yield();
+    thread2.interrupt();
+    ThreadUtils.join(thread2, 20 * 1000);
+
+    // Expect it got InterruptedException and didn't lock the service
+    sleep(500);
+    assertFalse(gotLock);
+    assertNotNull(exception);
+
+    // Unlock entire service in first thread
+    service.resumeLocking();
+    sleep(500);
+
+    // Make sure it didn't get locked by second thread
+    assertTrue(service.suspendLocking(1000));
+    DistributedLockService.destroy(name);
+  }
+  
+  public void testSuspendLockingIsNotInterruptible() {
+    
+    started = false;
+    gotLock = false;
+    exception = null;
+    wasFlagSet = false;
+
+    // Lock entire service in first thread
+    final String name = getUniqueName();
+    final DistributedLockService service = 
+      DistributedLockService.create(name, dlstSystem);
+    assertTrue(service.suspendLocking(1000));
+
+    // Start second thread that tries to lock in second thread
+    Thread thread2 = new Thread(new Runnable() {
+      public void run() {
+        try {
+          started = true;
+          gotLock = service.suspendLocking(-1);
+        } 
+        catch (VirtualMachineError e) {
+          SystemFailure.initiateFailure(e);
+          throw e;
+        }
+        catch (Throwable ex) {
+          exception = ex;
+        }
+        wasFlagSet = Thread.currentThread().isInterrupted();
+     }
+    });
+    thread2.start();
+    
+    // Interrupt second thread
+    while (!started) Thread.yield();
+    thread2.interrupt();
+    // Expect it didn't get an exception and didn't lock the service
+    sleep(500);
+    assertFalse(gotLock);
+    assertNull(exception);
+
+    // Unlock entire service in first thread
+    service.resumeLocking();
+    ThreadUtils.join(thread2, 20 * 1000);
+
+    // Now thread2 should have gotten the lock, not the exception, but the
+    // thread's flag should be set
+    
LogWriterUtils.getLogWriter().info("[testSuspendLockingIsNotInterruptible]" +
+      " gotLock=" + gotLock +
+      " wasFlagSet=" + wasFlagSet +
+      " exception=" + exception, exception);
+    assertTrue(gotLock);
+    assertNull(exception);
+    assertTrue(wasFlagSet);
+  }
+  
+  /**
+   * Tests what happens when you attempt to lock a name on a lock
+   * service that has been destroyed.
+   *
+   * @author David Whitlock
+   */
+  public void testLockDestroyedService() {
+    String serviceName = this.getUniqueName();
+    DistributedLockService service =
+      DistributedLockService.create(serviceName, dlstSystem);
+    DistributedLockService.destroy(serviceName);
+    try {
+      boolean locked = service.lock("TEST", -1, -1);
+      fail("Lock of destroyed service returned: " + locked);
+
+    } catch (LockServiceDestroyedException ex) {
+      // pass...
+    }
+  }
+  
+  public void testDepartedLastOwnerWithLease() {
+    final String serviceName = this.getUniqueName();
+
+    // Create service in this VM
+    DistributedLockService service =
+      DistributedLockService.create(serviceName, dlstSystem);
+    assertTrue(service.lock("key", -1, -1));
+    service.unlock("key");
+    
+    // Create service in other VM
+    VM otherVm = Host.getHost(0).getVM(0);
+    otherVm.invoke(new SerializableRunnable() {
+      public void run() {
+        DistributedLockService service2 =
+          DistributedLockService.create(serviceName, dlstSystem);
+        service2.lock("key", -1, 360000);
+        service2.unlock("key");
+        // Wait for asynchronous messaging to complete
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ex) {
+          fail("interrupted");
+        }
+        disconnectFromDS();
+      }
+    });
+    
+    // Now lock back in this VM
+    assertTrue(service.lock("key", -1, -1));
+    
+  }
+  
+  public void testDepartedLastOwnerNoLease() {
+    final String serviceName = this.getUniqueName();
+
+    // Create service in this VM
+    DistributedLockService service =
+      DistributedLockService.create(serviceName, dlstSystem);
+    assertTrue(service.lock("key", -1, -1));
+    service.unlock("key");
+    
+    // Create service in other VM
+    VM otherVm = Host.getHost(0).getVM(0);
+    otherVm.invoke(new SerializableRunnable() {
+      public void run() {
+        DistributedLockService service2 =
+          DistributedLockService.create(serviceName, dlstSystem);
+        service2.lock("key", -1, -1);
+        service2.unlock("key");
+        // Wait for asynchronous messaging to complete
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ex) {
+          fail("interrupted");
+        }
+        disconnectFromDS();
+      }
+    });
+    
+    // Now lock back in this VM
+    assertTrue(service.lock("key", -1, -1));
+    
+  }
+  
+  /** 
+   * Tests for 32461 R3 StuckLocks can occur on locks with an expiration lease
+   * <p>
+   * VM-A locks/unlocks "lock", VM-B leases "lock" and disconnects, VM-C
+   * attempts to lock "lock" and old dlock throws StuckLockException. VM-C
+   * should now succeed in acquiring the lock.
+   */
+  public void testBug32461() throws Exception {
+    LogWriterUtils.getLogWriter().fine("[testBug32461] prepping");
+
+    final String serviceName = getUniqueName();
+    final Object objName = "32461";
+    final int VM_A = 0;
+    final int VM_B = 1;
+    final int VM_C = 2;
+
+    // VM-A locks/unlocks "lock"...
+    LogWriterUtils.getLogWriter().fine("[testBug32461] VM-A locks/unlocks 
'32461'");
+    
+    Host.getHost(0).getVM(VM_A).invoke(new SerializableRunnable() {
+      public void run() {
+        remoteCreateService(serviceName);
+        final DistributedLockService service = 
+            DistributedLockService.getServiceNamed(serviceName);
+        assertTrue(service.lock(objName, -1, Long.MAX_VALUE));
+        service.unlock(objName);
+      }
+    });
+    
+    // VM-B leases "lock" and disconnects,
+    LogWriterUtils.getLogWriter().fine("[testBug32461] VM_B leases '32461' and 
disconnects");
+    
+    Host.getHost(0).getVM(VM_B).invoke(new SerializableRunnable() {
+      public void run() {
+        remoteCreateService(serviceName);
+        final DistributedLockService service = 
+            DistributedLockService.getServiceNamed(serviceName);
+        assertTrue(service.lock(objName, -1, Long.MAX_VALUE));
+        DistributedLockService.destroy(serviceName);
+        disconnectFromDS();
+      }
+    });
+    
+    LogWriterUtils.getLogWriter().fine("[testBug32461] VM_C attempts to lock 
'32461'");
+
+    Host.getHost(0).getVM(VM_C).invoke(new SerializableRunnable() {
+      public void run() {
+        remoteCreateService(serviceName);
+        final DistributedLockService service = 
+            DistributedLockService.getServiceNamed(serviceName);
+        assertTrue(service.lock(objName, -1, -1));
+        service.unlock(objName);
+      }
+    });
+  }
+  
+  public void testNoStuckLock() {
+    final String serviceName = this.getUniqueName();
+    final Object keyWithLease = "key-with-lease";
+    final Object keyNoLease = "key-no-lease";
+
+    // Create service in this VM
+    DistributedLockService service =
+      DistributedLockService.create(serviceName, dlstSystem);
+
+    assertTrue(service.lock(keyWithLease, -1, -1));
+    service.unlock(keyWithLease);
+
+    assertTrue(service.lock(keyNoLease, -1, -1));
+    service.unlock(keyNoLease);
+    
+    // Create service in other VM
+    VM otherVm = Host.getHost(0).getVM(0);
+    otherVm.invoke(new SerializableRunnable() {
+      public void run() {
+        DistributedLockService service2 =
+          DistributedLockService.create(serviceName, dlstSystem);
+        service2.lock(keyWithLease, -1, 360000);
+        service2.lock(keyNoLease, -1, -1);
+        disconnectFromDS();
+      }
+    });
+    
+    // Now lock back in this VM... no stuck locks anymore
+    assertTrue(service.lock(keyWithLease, -1, -1));
+    service.unlock(keyWithLease);
+    assertTrue(service.lock(keyNoLease, -1, -1));
+    service.unlock(keyNoLease);
+  }
+  
+  volatile boolean startedThread1_testReleaseOrphanedGrant;
+  volatile boolean releaseThread1_testReleaseOrphanedGrant;
+  volatile boolean startedThread2_testReleaseOrphanedGrant;
+  volatile boolean gotLockThread2_testReleaseOrphanedGrant;
+  /**
+   * Client requests lock and then interrupts lock request before processing
+   * the grant reply. This causes the Client to send a release msg to the
+   * grantor.
+   */
+  public void testReleaseOrphanedGrant_Local() {
+    DLockRequestProcessor.setDebugReleaseOrphanedGrant(true);
+    DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+    try {
+      startedThread2_testReleaseOrphanedGrant = false;
+      gotLockThread2_testReleaseOrphanedGrant = false;
+      releaseThread1_testReleaseOrphanedGrant = false;
+      
+      LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] 
create lock service");
+      final String serviceName = getUniqueName();
+      final DistributedLockService service = 
+        DistributedLockService.create(serviceName, dlstSystem);
+      
+      // thread to get lock and wait and then unlock
+      final Thread thread1 = new Thread(new Runnable() {
+        public void run() {
+          LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] 
get the lock");
+          assertTrue(service.lock("obj", -1, -1));
+          DLockRequestProcessor.setWaitToProcessDLockResponse(true);
+          startedThread1_testReleaseOrphanedGrant = true;
+          synchronized(Thread.currentThread()) {
+            while (!releaseThread1_testReleaseOrphanedGrant) {
+              try {
+                Thread.currentThread().wait();
+              } catch (InterruptedException ignore) {fail("interrupted");}
+            }
+          }
+          LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] 
unlock the lock");
+          service.unlock("obj");
+        }
+      });
+      thread1.start();
+      while (!startedThread1_testReleaseOrphanedGrant) {
+        Thread.yield();
+      }
+      
+      // thread to interrupt lockInterruptibly call to cause zombie grant
+      final Thread thread2 = new Thread(new Runnable() {
+        public void run() {
+          try {
+            
LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] call 
lockInterruptibly");
+            startedThread2_testReleaseOrphanedGrant = true;
+            assertFalse(service.lockInterruptibly("obj", -1, -1));
+          } catch (InterruptedException expected) {
+            Thread.currentThread().interrupt();
+          }
+        }
+      });
+      thread2.start();
+      while (!startedThread2_testReleaseOrphanedGrant) {
+        Thread.yield();
+      }
+          
+      // release first thread to unlock
+      LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] 
release 1st thread");
+      sleep(500);
+      synchronized(thread1) {
+        releaseThread1_testReleaseOrphanedGrant = true;
+        thread1.notifyAll();
+      }
+      sleep(500);
+  
+      // while first thread is stuck on waitToProcessDLockResponse,
+      //   interrupt 2nd thread
+      LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] 
interrupt 2nd thread");
+      thread2.interrupt();
+      ThreadUtils.join(thread2, 20 * 1000);
+  
+      // release waitToProcessDLockResponse
+      LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] 
process lock response");
+      sleep(500);
+      DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+      
+      // relock obj to make sure zombie release worked
+      LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Local] 
verify lock not held");
+      assertTrue(service.lock("obj", 1000, -1));
+    }
+    finally {
+      DLockRequestProcessor.setDebugReleaseOrphanedGrant(false);
+      DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+    }
+  }
+  
+  static volatile Thread threadVM1_testReleaseOrphanedGrant_Remote;
+  static volatile Thread threadVM2_testReleaseOrphanedGrant_Remote;
+  static volatile boolean startedThreadVM1_testReleaseOrphanedGrant_Remote;
+  static volatile boolean releaseThreadVM1_testReleaseOrphanedGrant_Remote;
+  static volatile boolean unlockedThreadVM1_testReleaseOrphanedGrant_Remote;
+  static volatile boolean startedThreadVM2_testReleaseOrphanedGrant_Remote;
+  static volatile boolean gotLockThreadVM2_testReleaseOrphanedGrant_Remote;
+  public void testReleaseOrphanedGrant_Remote() {
+    doTestReleaseOrphanedGrant_Remote(false);
+  }
+  public void testReleaseOrphanedGrant_RemoteWithDestroy() {
+    doTestReleaseOrphanedGrant_Remote(true);
+  }
+  public void doTestReleaseOrphanedGrant_Remote(final boolean 
destroyLockService) {
+    final VM vm1 = Host.getHost(0).getVM(0);
+    final VM vm2 = Host.getHost(0).getVM(1);
+
+    try {
+      LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] 
create lock service");
+      final String serviceName = getUniqueName();
+      final DistributedLockService service = 
+        DistributedLockService.create(serviceName, dlstSystem);
+        
+      // lock and unlock to make sure this vm is grantor
+      assertTrue(service.lock("obj", -1, -1));
+      service.unlock("obj");
+      
+      // thread to get lock and wait and then unlock
+      vm1.invokeAsync(new SerializableRunnable() {
+        public void run() {
+          
LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] get the 
lock");
+          threadVM1_testReleaseOrphanedGrant_Remote = Thread.currentThread();
+          connectDistributedSystem();
+          DistributedLockService service_vm1 =
+            DistributedLockService.create(serviceName, getSystem());
+          assertTrue(service_vm1.lock("obj", -1, -1));
+          synchronized(threadVM1_testReleaseOrphanedGrant_Remote) {
+            while (!releaseThreadVM1_testReleaseOrphanedGrant_Remote) {
+              try {
+                startedThreadVM1_testReleaseOrphanedGrant_Remote = true;
+                Thread.currentThread().wait();
+              } catch (InterruptedException ignore) {fail("interrupted");}
+            }
+          }
+          
LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] unlock 
the lock");
+          service_vm1.unlock("obj");
+          unlockedThreadVM1_testReleaseOrphanedGrant_Remote = true;
+        }
+      });
+      vm1.invoke(new SerializableRunnable() {
+        public void run() {
+          while (!startedThreadVM1_testReleaseOrphanedGrant_Remote) {
+            Thread.yield();
+          }
+        }
+      });
+      sleep(500);
+      
+      // thread to interrupt lockInterruptibly call to cause zombie grant
+      vm2.invokeAsync(new SerializableRunnable() {
+        public void run() {
+          
LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] call 
lockInterruptibly");
+          threadVM2_testReleaseOrphanedGrant_Remote = Thread.currentThread();
+          DistributedLockService service_vm2 =
+            DistributedLockService.create(serviceName, getSystem());
+          startedThreadVM2_testReleaseOrphanedGrant_Remote = true;
+          try {
+            DLockRequestProcessor.setDebugReleaseOrphanedGrant(true);
+            DLockRequestProcessor.setWaitToProcessDLockResponse(true);
+            assertFalse(service_vm2.lockInterruptibly("obj", -1, -1));
+          } catch (InterruptedException expected) 
{Thread.currentThread().interrupt();}
+        }
+      });
+      vm2.invoke(new SerializableRunnable() {
+        public void run() {
+          while (!startedThreadVM2_testReleaseOrphanedGrant_Remote) {
+            Thread.yield();
+          }
+        }
+      });
+      sleep(500);
+          
+      // release first thread to unlock
+      vm1.invoke(new SerializableRunnable() {
+        public void run() {
+          
LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] release 
1st thread");
+          synchronized(threadVM1_testReleaseOrphanedGrant_Remote) {
+            releaseThreadVM1_testReleaseOrphanedGrant_Remote = true;
+            threadVM1_testReleaseOrphanedGrant_Remote.notifyAll();
+          }
+        }
+      });
+      sleep(500); // lock is being released, grantor will grant lock to vm2
+  
+      // while first thread is stuck on waitToProcessDLockResponse,
+      //   interrupt 2nd thread
+      vm2.invoke(new SerializableRunnable() {
+        public void run() {
+          
LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] interrupt 
2nd thread");
+          threadVM2_testReleaseOrphanedGrant_Remote.interrupt();
+          ThreadUtils.join(threadVM2_testReleaseOrphanedGrant_Remote, 
+              5 * 60 * 1000);
+          if (destroyLockService) {
+            
LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] destroy 
lock service");
+            DistributedLockService.destroy(serviceName);
+            assertNull(DistributedLockService.getServiceNamed(serviceName));
+          }
+        }
+      });
+      sleep(500); // grant is blocked while reply processor is being destroyed
+  
+      // release waitToProcessDLockResponse
+      vm2.invoke(new SerializableRunnable() {
+        public void run() {
+          
LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] process 
lock response");
+          DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+        }
+      });
+      sleep(500); // process grant and send zombie release to grantor
+      
+      // relock obj to make sure zombie release worked
+      LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] 
verify lock not held");
+      assertTrue(service.lock("obj", 1000, -1));
+    }
+    finally {
+      vm2.invoke(new SerializableRunnable() {
+        public void run() {
+          
LogWriterUtils.getLogWriter().info("[testReleaseOrphanedGrant_Remote] clean up 
DebugReleaseOrphanedGrant");
+          DLockRequestProcessor.setDebugReleaseOrphanedGrant(false);
+          DLockRequestProcessor.setWaitToProcessDLockResponse(false);
+        }
+      });
+    }
+  }
+  
+  public void testDestroyLockServiceAfterGrantResponse() throws Throwable {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    
+    final String serviceName = getUniqueName();
+
+    vm0.invoke(new SerializableRunnable("Create the grantor") {
+      
+      public void run() {
+        connectDistributedSystem();
+        final DistributedLockService service = 
+          DistributedLockService.create(serviceName, dlstSystem);
+          
+        // lock and unlock to make sure this vm is grantor
+        assertTrue(service.lock("obj", -1, -1));
+        service.unlock("obj");
+      }
+    });
+    
+    DistributionMessageObserver.setInstance(new DistributionMessageObserver() {
+
+      @Override
+      public void beforeProcessMessage(DistributionManager dm,
+          DistributionMessage message) {
+        if(message instanceof DLockResponseMessage) {
+          DistributedLockService.destroy(serviceName);  
+        }
+      }
+    });
+
+    connectDistributedSystem();
+    final DistributedLockService service = 
+      DistributedLockService.create(serviceName, dlstSystem);
+    try {
+      service.lock("obj", -1, -1);
+      fail("The lock service should have been destroyed");
+    } catch(LockServiceDestroyedException expected) {
+      //Do nothing
+    }
+
+    vm0.invoke(new SerializableRunnable("check to make sure the lock is not 
orphaned") {
+
+      public void run() {
+        final DistributedLockService service = 
+          DistributedLockService.getServiceNamed(serviceName);
+
+        // lock and unlock to make sure this vm is grantor
+        assertTrue(service.lock("obj", -1, -1));
+        service.unlock("obj");


<TRUNCATED>

Reply via email to