http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java 
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java
new file mode 100644
index 0000000..e79eddc
--- /dev/null
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/cache30/SlowRecDUnitTest.java
@@ -0,0 +1,1453 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.cache30;
+
+import com.gemstone.gemfire.SystemFailure;
+import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.cache.util.*;
+import com.gemstone.gemfire.distributed.internal.*;
+import com.gemstone.gemfire.internal.tcp.Connection;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+import dunit.*;
+
+import java.io.*;
+import java.util.*;
+
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test to make sure slow receiver queuing is working
+ *
+ * @author darrel
+ * @since 4.2.1
+ */
+@Category(DistributedTest.class)
+@Ignore("Test was disabled by renaming to DisabledTest")
+public class SlowRecDUnitTest extends CacheTestCase {
+
+  public SlowRecDUnitTest(String name) {
+    super(name);
+  }
+
+  // this test has special config of its distributed system so
+  // the setUp and tearDown methods need to make sure we don't
+  // use the ds from previous test and that we don't leave ours around
+  // for the next test to use.
+  
+  public void setUp() throws Exception {
+    try {
+      disconnectAllFromDS();
+    } finally {
+      super.setUp();
+    }
+  }
+  public void tearDown2() throws Exception {
+    try {
+      super.tearDown2();
+    } finally {
+      disconnectAllFromDS();
+    }
+  }
+  
+  //////////////////////  Test Methods  //////////////////////
+
+  private VM getOtherVm() {
+    Host host = Host.getHost(0);
+    return host.getVM(0);
+  }
+
+  static protected Object lastCallback = null;
+
+  private void doCreateOtherVm(final Properties p, final boolean addListener) {
+    VM vm = getOtherVm();
+    vm.invoke(new CacheSerializableRunnable("create root") {
+        public void run2() throws CacheException {
+          getSystem(p);
+          createAckRegion(true, false);
+          AttributesFactory af = new AttributesFactory();
+          af.setScope(Scope.DISTRIBUTED_NO_ACK);
+          af.setDataPolicy(DataPolicy.REPLICATE);
+          if (addListener) {
+            CacheListener cl = new CacheListenerAdapter() {
+                public void afterUpdate(EntryEvent event) {
+                  // make the slow receiver event slower!
+                  try {Thread.sleep(500);} catch (InterruptedException 
shuttingDown) {fail("interrupted");}
+                }
+              };
+            af.setCacheListener(cl);
+          } else {
+            CacheListener cl = new CacheListenerAdapter() {
+                public void afterCreate(EntryEvent event) {
+//                   getLogWriter().info("afterCreate " + event.getKey());
+                  if (event.getCallbackArgument() != null) {
+                    lastCallback = event.getCallbackArgument();
+                  }
+                  if (event.getKey().equals("sleepkey")) {
+                    int sleepMs = ((Integer)event.getNewValue()).intValue();
+//                     getLogWriter().info("sleepkey sleeping for " + sleepMs);
+                    try {Thread.sleep(sleepMs);} catch (InterruptedException 
ignore) {fail("interrupted");}
+                  }
+                }
+                public void afterUpdate(EntryEvent event) {
+//                   getLogWriter().info("afterUpdate " + event.getKey());
+                  if (event.getCallbackArgument() != null) {
+                    lastCallback = event.getCallbackArgument();
+                  }
+                  if (event.getKey().equals("sleepkey")) {
+                    int sleepMs = ((Integer)event.getNewValue()).intValue();
+//                     getLogWriter().info("sleepkey sleeping for " + sleepMs);
+                    try {Thread.sleep(sleepMs);} catch (InterruptedException 
ignore) {fail("interrupted");}
+                  }
+                }
+                public void afterInvalidate(EntryEvent event) {
+                  if (event.getCallbackArgument() != null) {
+                    lastCallback = event.getCallbackArgument();
+                  }
+                }
+                public void afterDestroy(EntryEvent event) {
+                  if (event.getCallbackArgument() != null) {
+                    lastCallback = event.getCallbackArgument();
+                  }
+                }
+              };
+            af.setCacheListener(cl);
+          }
+          Region r1 = createRootRegion("slowrec", af.create());
+          // place holder so we receive updates
+          r1.create("key", "value");
+        }
+      });
+  }
+  static protected final String CHECK_INVALID = "CHECK_INVALID";
+  
+  private void checkLastValueInOtherVm(final String lastValue, final Object 
lcb) {
+    VM vm = getOtherVm();
+    vm.invoke(new CacheSerializableRunnable("check last value") {
+        public void run2() throws CacheException {
+          Region r1 = getRootRegion("slowrec");
+          if (lcb != null) {
+            WaitCriterion ev = new WaitCriterion() {
+              public boolean done() {
+                return lcb.equals(lastCallback);
+              }
+              public String description() {
+                return "waiting for callback";
+              }
+            };
+            DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+            assertEquals(lcb, lastCallback);
+          }
+          if (lastValue == null) {
+            final Region r = r1;
+            WaitCriterion ev = new WaitCriterion() {
+              public boolean done() {
+                return r.getEntry("key") == null;
+              }
+              public String description() {
+                return "waiting for key to become null";
+              }
+            };
+            DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+            assertEquals(null, r1.getEntry("key"));
+          } else if (CHECK_INVALID.equals(lastValue)) {
+            // should be invalid
+            {
+              final Region r = r1;
+              WaitCriterion ev = new WaitCriterion() {
+                public boolean done() {
+                  Entry e = r.getEntry("key");
+                  if (e == null) {
+                    return false;
+                  }
+                  return e.getValue() == null;
+                }
+                public String description() {
+                  return "waiting for invalidate";
+                }
+              };
+              DistributedTestCase.waitForCriterion(ev, 50 * 1000, 200, true);
+//              assertNotNull(re);
+//              assertEquals(null, value);
+            }
+          } else {
+            {
+              int retryCount = 1000;
+              Region.Entry re = null;
+              Object value = null;
+              while (retryCount-- > 0) {
+                re = r1.getEntry("key");
+                if (re != null) {
+                  value = re.getValue();
+                  if (value != null && value.equals(lastValue)) {
+                    break;
+                  }
+                }
+                try {Thread.sleep(50);} catch (InterruptedException ignore) 
{fail("interrupted");}
+              }
+              assertNotNull(re);
+              assertNotNull(value);
+              assertEquals(lastValue, value);
+            }
+          }
+        }
+      });
+  }
+
+  private void forceQueueFlush() {
+    Connection.FORCE_ASYNC_QUEUE=false;
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return stats.getAsyncThreads() == 0;
+      }
+      public String description() {
+        return "Waiting for async threads to disappear";
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, 10 * 1000, 200, true);
+  }
+  
+  private void forceQueuing(final Region r) throws CacheException {
+    Connection.FORCE_ASYNC_QUEUE=true;
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+    r.put("forcekey", "forcevalue");
+    
+    // wait for the flusher to get its first flush in progress
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return stats.getAsyncQueueFlushesInProgress() != 0;
+      }
+      public String description() {
+        return "waiting for flushes to start";
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+  }
+  
+  /**
+   * Make sure that noack puts to a receiver
+   * will eventually queue and then catch up.
+   */
+  public void testNoAck() throws CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "1");
+    doCreateOtherVm(p, false);
+
+    int repeatCount = 2;
+    int count = 0;
+    while (repeatCount-- > 0) {
+      forceQueuing(r);
+      final Object key = "key";
+      long queuedMsgs = stats.getAsyncQueuedMsgs();
+      long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+//      long conflatedMsgs = stats.getAsyncConflatedMsgs();
+      long queueSize = stats.getAsyncQueueSize();
+      String lastValue = "";
+      final long intialQueuedMsgs = queuedMsgs;
+      long curQueuedMsgs = queuedMsgs - dequeuedMsgs;
+      try {
+        // loop while we still have queued the initially queued msgs
+        // OR the cur # of queued msgs < 6
+        while (dequeuedMsgs < intialQueuedMsgs || curQueuedMsgs <= 6) {
+          String value = "count=" + count;
+          lastValue = value;
+          r.put(key, value);
+          count ++;
+          queueSize = stats.getAsyncQueueSize();
+          queuedMsgs = stats.getAsyncQueuedMsgs();
+          dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+          curQueuedMsgs = queuedMsgs - dequeuedMsgs;
+        }
+        getLogWriter().info("After " + count + " " + " puts slowrec mode 
kicked in by queuing " + queuedMsgs + " for a total size of " + queueSize);
+      } finally {
+        forceQueueFlush();
+      }
+      WaitCriterion ev = new WaitCriterion() {
+        public boolean done() {
+          return stats.getAsyncQueueSize() == 0;
+        }
+        public String description() {
+          return "Waiting for queues to empty";
+        }
+      };
+      final long start = System.currentTimeMillis();
+      DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
+      final long finish = System.currentTimeMillis();
+      getLogWriter().info("After " + (finish - start) + " ms async msgs where 
flushed. A total of " + stats.getAsyncDequeuedMsgs() + " were flushed. 
lastValue=" + lastValue);
+    
+      checkLastValueInOtherVm(lastValue, null);
+    }
+  }
+  /**
+   * Create a region named AckRegion with ACK scope
+   */
+  protected Region createAckRegion(boolean mirror, boolean conflate) throws 
CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    if (mirror) {
+      factory.setDataPolicy(DataPolicy.REPLICATE);
+    }
+    if (conflate) {
+      factory.setEnableAsyncConflation(true);
+    }
+    final Region r = createRootRegion("AckRegion", factory.create());
+    return r;
+  }
+  /**
+   * Make sure that noack puts to a receiver
+   * will eventually queue and then catch up with conflation
+   */
+  public void testNoAckConflation() throws CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "1");
+    doCreateOtherVm(p, false);
+
+    forceQueuing(r);
+    final Object key = "key";
+    int count = 0;
+//    long queuedMsgs = stats.getAsyncQueuedMsgs();
+//    long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    final long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+//    long queueSize = stats.getAsyncQueueSize();
+    String lastValue = "";
+    final long intialDeQueuedMsgs = stats.getAsyncDequeuedMsgs();
+    long start = 0;
+    try {
+      while ((stats.getAsyncConflatedMsgs()-initialConflatedMsgs) < 1000) {
+        String value = "count=" + count;
+        lastValue = value;
+        r.put(key, value);
+        count ++;
+        //       getLogWriter().info("After " + count + " "
+        //                           + " puts queueSize=" + queueSize
+        //                           + "    queuedMsgs=" + queuedMsgs
+        //                           + "  dequeuedMsgs=" + dequeuedMsgs
+        //                           + " conflatedMsgs=" + conflatedMsgs);
+      }
+      start = System.currentTimeMillis();
+    } finally {
+      forceQueueFlush();
+    }
+//     queueSize = stats.getAsyncQueueSize();
+//     queuedMsgs = stats.getAsyncQueuedMsgs();
+
+//     getLogWriter().info("After " + count + " "
+//                         + " puts slowrec mode kicked in by queuing "
+//                         + queuedMsgs + " for a total size of " + queueSize
+//                         + " conflatedMsgs=" + conflatedMsgs
+//                         + " dequeuedMsgs=" + dequeuedMsgs);
+//     final long start = System.currentTimeMillis();
+//     while (stats.getAsyncQueuedMsgs() > stats.getAsyncDequeuedMsgs()) {
+//       try {Thread.sleep(100);} catch (InterruptedException ignore) {}
+//       queueSize = stats.getAsyncQueueSize();
+//       queuedMsgs = stats.getAsyncQueuedMsgs();
+//       dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+//       conflatedMsgs = stats.getAsyncConflatedMsgs();
+//       getLogWriter().info("After sleeping"
+//                           + "     queueSize=" + queueSize
+//                           + "    queuedMsgs=" + queuedMsgs
+//                           + "  dequeuedMsgs=" + dequeuedMsgs
+//                           + " conflatedMsgs=" + conflatedMsgs);
+    final long finish = System.currentTimeMillis();
+    getLogWriter().info("After " + (finish - start) + " ms async msgs where 
flushed. A total of " + (stats.getAsyncDequeuedMsgs()-intialDeQueuedMsgs) + " 
were flushed. Leaving a queue size of " + stats.getAsyncQueueSize() + ". The 
lastValue was " + lastValue);
+    
+    checkLastValueInOtherVm(lastValue, null);
+  }
+  /**
+   * make sure ack does not hang
+   * make sure two ack updates do not conflate but are both queued
+   */
+  public void testAckConflation() throws CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final Region ar = createAckRegion(false, true);
+    ar.create("ackKey", "ackValue");
+    
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "2");
+    doCreateOtherVm(p, false);
+
+    forceQueuing(r);
+    {
+      // make sure ack does not hang
+      // make sure two ack updates do not conflate but are both queued
+      long startQueuedMsgs = stats.getAsyncQueuedMsgs();
+      long startConflatedMsgs = stats.getAsyncConflatedMsgs();
+      Thread t = new Thread(new Runnable() {
+          public void run() {
+            ar.put("ackKey", "ackValue");
+          }
+        });
+      t.start();
+      Thread t2 = new Thread(new Runnable() {
+          public void run() {
+            ar.put("ackKey", "ackValue");
+          }
+        });
+      t2.start();
+      // give threads a chance to get queued
+      try {Thread.sleep(100);} catch (InterruptedException ignore) 
{fail("interrupted");}
+      forceQueueFlush();
+      DistributedTestCase.join(t, 2 * 1000, getLogWriter());
+      DistributedTestCase.join(t2, 2 * 1000, getLogWriter());
+      long endQueuedMsgs = stats.getAsyncQueuedMsgs();
+      long endConflatedMsgs = stats.getAsyncConflatedMsgs();
+      assertEquals(startConflatedMsgs, endConflatedMsgs);
+      // queue should be flushed by the time we get an ack
+      assertEquals(endQueuedMsgs, stats.getAsyncDequeuedMsgs());
+      assertEquals(startQueuedMsgs+2, endQueuedMsgs);
+    }
+  }
+  /**
+   * Make sure that only sequences of updates are conflated
+   * Also checks that sending to a conflating region and non-conflating region
+   * does the correct thing.
+   * Test disabled because it intermittently fails due to race conditions
+   * in test. This has been fixed in congo's tests. See bug 35357.
+   */
+  public void _disabled_testConflationSequence() throws CacheException {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r = createRootRegion("slowrec", factory.create());
+    factory.setEnableAsyncConflation(false);
+    final Region noConflate = createRootRegion("noConflate", factory.create());
+    final DMStats stats = getSystem().getDistributionManager().getStats();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "1");
+    doCreateOtherVm(p, false);
+    {
+      VM vm = getOtherVm();
+      vm.invoke(new CacheSerializableRunnable("create noConflate") {
+          public void run2() throws CacheException {
+            AttributesFactory af = new AttributesFactory();
+            af.setScope(Scope.DISTRIBUTED_NO_ACK);
+            af.setDataPolicy(DataPolicy.REPLICATE);
+            createRootRegion("noConflate", af.create());
+          }
+        });
+    }
+
+    // now make sure update+destroy does not conflate
+    final Object key = "key";      
+    getLogWriter().info("[testConflationSequence] about to force queuing");
+    forceQueuing(r);
+
+    int count = 0;
+    String value = "";
+    String lastValue = value;
+    Object mylcb = null;
+    long initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+//    long initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+//    long dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    int endCount = count+60;
+
+    getLogWriter().info("[testConflationSequence] about to build up queue");
+    long begin = System.currentTimeMillis();
+    while (count < endCount) {
+      value = "count=" + count;
+      lastValue = value;
+      r.create(key, value);
+      count ++;
+      value = "count=" + count;
+      lastValue = value;
+      r.put(key, value);
+      count ++;
+      mylcb = value;
+      r.destroy(key, mylcb);
+      count ++;
+      lastValue = null;
+//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+    }
+    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
+    forceQueueFlush();
+    checkLastValueInOtherVm(lastValue, mylcb);
+
+    // now make sure create+update+localDestroy does not conflate
+    getLogWriter().info("[testConflationSequence] force queuing 
create-update-destroy");
+    forceQueuing(r);
+    initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+//    initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+//    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    endCount = count + 40;
+    
+    getLogWriter().info("[testConflationSequence] create-update-destroy");
+    begin = System.currentTimeMillis();
+    while (count < endCount) {
+      value = "count=" + count;
+      lastValue = value;
+      r.create(key, value);
+      count++;
+      value = "count=" + count;
+      lastValue = value;
+      r.put(key, value);
+      count ++;
+      r.localDestroy(key);
+//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+    }
+    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
+    forceQueueFlush();
+    checkLastValueInOtherVm(lastValue, null);
+
+    // now make sure update+invalidate does not conflate
+    getLogWriter().info("[testConflationSequence] force queuing 
update-invalidate");
+    forceQueuing(r);
+    initialConflatedMsgs = stats.getAsyncConflatedMsgs();
+//    initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    value = "count=" + count;
+    lastValue = value;
+    r.create(key, value);
+    count++;
+//    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    endCount = count + 40;
+
+    getLogWriter().info("[testConflationSequence] update-invalidate");
+    begin = System.currentTimeMillis();
+    while (count < endCount) {
+      value = "count=" + count;
+      lastValue = value;
+      r.put(key, value);
+      count ++;
+      r.invalidate(key);
+      count ++;
+      lastValue = CHECK_INVALID;
+//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+    }
+    assertEquals(initialConflatedMsgs, stats.getAsyncConflatedMsgs());
+    forceQueueFlush();
+    getLogWriter().info("[testConflationSequence] assert other vm");
+    checkLastValueInOtherVm(lastValue, null);
+
+    r.destroy(key);
+
+    // now make sure updates to a conflating region are conflated even while
+    // updates to a non-conflating are not.
+    getLogWriter().info("[testConflationSequence] conflate & no-conflate 
regions");
+    forceQueuing(r);
+    final int initialAsyncSocketWrites = stats.getAsyncSocketWrites();
+//    initialDequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    
+    value = "count=" + count;
+    lastValue = value;
+    long conflatedMsgs = stats.getAsyncConflatedMsgs();
+    long queuedMsgs = stats.getAsyncQueuedMsgs();
+    r.create(key, value);
+    queuedMsgs++;
+    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+    r.put(key, value);
+    queuedMsgs++;
+    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+    noConflate.create(key, value);
+    queuedMsgs++;
+    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+    noConflate.put(key, value);
+    queuedMsgs++;
+    assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+    assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+    count++;
+//    dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+    endCount = count + 80;
+
+    begin = System.currentTimeMillis();
+    getLogWriter().info("[testConflationSequence:DEBUG] count=" + count
+                        + " queuedMsgs=" + stats.getAsyncQueuedMsgs()
+                        + " conflatedMsgs=" + stats.getAsyncConflatedMsgs()
+                        + " dequeuedMsgs=" + stats.getAsyncDequeuedMsgs()
+                        + " asyncSocketWrites=" + stats.getAsyncSocketWrites()
+                        );
+    while (count < endCount) {
+      // make sure we continue to have a flush in progress
+      assertEquals(1, stats.getAsyncThreads());
+      assertEquals(1, stats.getAsyncQueues());
+      assertTrue(stats.getAsyncQueueFlushesInProgress() > 0);
+      // make sure we are not completing any flushing while this loop is in 
progress
+      assertEquals(initialAsyncSocketWrites, stats.getAsyncSocketWrites());
+      value = "count=" + count;
+      lastValue = value;
+      r.put(key, value);
+      count ++;
+      // make sure it was conflated and not queued
+      assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+      conflatedMsgs++;
+      assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+      noConflate.put(key, value);
+      // make sure it was queued and not conflated
+      queuedMsgs++;
+      assertEquals(queuedMsgs, stats.getAsyncQueuedMsgs());
+      assertEquals(conflatedMsgs, stats.getAsyncConflatedMsgs());
+//      dequeuedMsgs = stats.getAsyncDequeuedMsgs();
+      assertTrue(System.currentTimeMillis() < begin+1000*60*2);
+    }
+
+    forceQueueFlush();
+    getLogWriter().info("[testConflationSequence] assert other vm");
+    checkLastValueInOtherVm(lastValue, null);
+  }
+  /**
+   * Make sure that exceeding the queue size limit causes a disconnect.
+   */
+  public void testSizeDisconnect() throws CacheException {
+    final String expected = 
+      "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced 
disconnect sent to" +
+      "||java.io.IOException: Broken pipe";
+    final String addExpected = 
+      "<ExpectedException action=add>" + expected + "</ExpectedException>";
+    final String removeExpected = 
+      "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DM dm = getSystem().getDistributionManager();
+    final DMStats stats = dm.getStats();
+    // set others before vm0 connects
+    final Set others = dm.getOtherDistributionManagerIds();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "5");
+    p.setProperty("async-max-queue-size", "1"); // 1 meg
+    doCreateOtherVm(p, false);
+
+    
+    final Object key = "key";
+    final int VALUE_SIZE = 1024 * 100; // .1M async-max-queue-size should give 
us 10 of these 100K msgs before queue full
+    final byte[] value = new byte[VALUE_SIZE];
+    int count = 0;
+    forceQueuing(r);
+    long queuedMsgs = stats.getAsyncQueuedMsgs();
+    long queueSize = stats.getAsyncQueueSize();
+    
+    getCache().getLogger().info(addExpected);
+    try {    
+      while (stats.getAsyncQueueSizeExceeded() == 0 && 
stats.getAsyncQueueTimeouts() == 0) {
+        r.put(key, value);
+        count ++;
+        if (stats.getAsyncQueueSize() > 0) {
+          queuedMsgs = stats.getAsyncQueuedMsgs();
+          queueSize = stats.getAsyncQueueSize();
+        }
+        if (count > 100) {
+          fail("should have exceeded max-queue-size by now");
+        }
+      }
+      getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts 
slowrec mode kicked in but the queue filled when its size reached " + queueSize 
+ " with " + queuedMsgs + " msgs");
+      // make sure we lost a connection to vm0
+      WaitCriterion ev = new WaitCriterion() {
+        public boolean done() {
+          return dm.getOtherDistributionManagerIds().size() <= others.size()
+              && stats.getAsyncQueueSize() == 0;
+        }
+        public String description() {
+          return "waiting for connection loss";
+        }
+      };
+      DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
+    }
+    finally {
+      forceQueueFlush();
+      getCache().getLogger().info(removeExpected);
+    }
+    assertEquals(others, dm.getOtherDistributionManagerIds());
+    assertEquals(0, stats.getAsyncQueueSize());
+  }
+  /**
+   * Make sure that exceeding the async-queue-timeout causes a disconnect.<p>
+   * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
+   * in build.xml in the splitbrainNov07 branch.  It had been disabled since
+   * June 2006 due to hangs.  Some of the tests, like this one, still need
+   * work because the periodically (some quite often) fail.
+   */
+  public void donottestTimeoutDisconnect() throws CacheException {
+    final String expected = 
+      "com.gemstone.gemfire.internal.tcp.ConnectionException: Forced 
disconnect sent to" +
+      "||java.io.IOException: Broken pipe";
+    final String addExpected = 
+      "<ExpectedException action=add>" + expected + "</ExpectedException>";
+    final String removeExpected = 
+      "<ExpectedException action=remove>" + expected + "</ExpectedException>";
+      
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DM dm = getSystem().getDistributionManager();
+    final DMStats stats = dm.getStats();
+    // set others before vm0 connects
+    final Set others = dm.getOtherDistributionManagerIds();
+
+    // create receiver in vm0 with queuing enabled
+    Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "5");
+    p.setProperty("async-queue-timeout", "500"); // 500 ms
+    doCreateOtherVm(p, true);
+
+    
+    final Object key = "key";
+    final int VALUE_SIZE = 1024; // 1k
+    final byte[] value = new byte[VALUE_SIZE];
+    int count = 0;
+    long queuedMsgs = stats.getAsyncQueuedMsgs();
+    long queueSize = stats.getAsyncQueueSize();
+    final long timeoutLimit = System.currentTimeMillis() + 5000;
+
+    getCache().getLogger().info(addExpected);
+    try {    
+      while (stats.getAsyncQueueTimeouts() == 0) {
+        r.put(key, value);
+        count ++;
+        if (stats.getAsyncQueueSize() > 0) {
+          queuedMsgs = stats.getAsyncQueuedMsgs();
+          queueSize = stats.getAsyncQueueSize();
+        }
+        if (System.currentTimeMillis() > timeoutLimit) {
+          fail("should have exceeded async-queue-timeout by now");
+        }
+      }
+      getLogWriter().info("After " + count + " " + VALUE_SIZE + " byte puts 
slowrec mode kicked in but the queue filled when its size reached " + queueSize 
+ " with " + queuedMsgs + " msgs");
+      // make sure we lost a connection to vm0
+      WaitCriterion ev = new WaitCriterion() {
+        public boolean done() {
+          if (dm.getOtherDistributionManagerIds().size() > others.size()) {
+            return false;
+          }
+          return stats.getAsyncQueueSize() == 0;
+        }
+        public String description() {
+          return "waiting for departure";
+        }
+      };
+      DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+    }
+    finally {
+      getCache().getLogger().info(removeExpected);
+    }
+    assertEquals(others, dm.getOtherDistributionManagerIds());
+    assertEquals(0, stats.getAsyncQueueSize());
+  }
+
+  // static helper methods ---------------------------------------------------
+  
+  private static final String KEY_SLEEP = "KEY_SLEEP";
+  private static final String KEY_WAIT = "KEY_WAIT";
+  private static final String KEY_DISCONNECT = "KEY_DISCONNECT";
+  
+  protected final static int CALLBACK_CREATE = 0;
+  protected final static int CALLBACK_UPDATE = 1;
+  protected final static int CALLBACK_INVALIDATE = 2;
+  protected final static int CALLBACK_DESTROY = 3;
+  protected final static int CALLBACK_REGION_INVALIDATE = 4;
+  
+  protected final static Integer CALLBACK_CREATE_INTEGER = new 
Integer(CALLBACK_CREATE);
+  protected final static Integer CALLBACK_UPDATE_INTEGER = new 
Integer(CALLBACK_UPDATE);
+  protected final static Integer CALLBACK_INVALIDATE_INTEGER = new 
Integer(CALLBACK_INVALIDATE);
+  protected final static Integer CALLBACK_DESTROY_INTEGER = new 
Integer(CALLBACK_DESTROY);
+  protected final static Integer CALLBACK_REGION_INVALIDATE_INTEGER = new 
Integer(CALLBACK_REGION_INVALIDATE);
+
+  private static class CallbackWrapper {
+    public final Object callbackArgument;
+    public final  int callbackType;
+    public CallbackWrapper(Object callbackArgument, int callbackType) {
+      this.callbackArgument = callbackArgument;
+      this.callbackType = callbackType;
+    }
+    public String toString() {
+      return "CallbackWrapper: " + callbackArgument.toString() + " of type " + 
callbackType;
+    }
+  }
+  
+  protected static class ControlListener extends CacheListenerAdapter {
+    public final LinkedList callbackArguments = new LinkedList();
+    public final LinkedList callbackTypes = new LinkedList();
+    public final Object CONTROL_LOCK = new Object();
+    
+    public void afterCreate(EntryEvent event) {
+      getLogWriter().info(event.getRegion().getName() + " afterCreate " + 
event.getKey());
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_CREATE));
+          this.callbackTypes.add(CALLBACK_CREATE_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+      processEvent(event);
+    }
+    public void afterUpdate(EntryEvent event) {
+      getLogWriter().info(event.getRegion().getName() + " afterUpdate " + 
event.getKey());
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), CALLBACK_UPDATE));
+          this.callbackTypes.add(CALLBACK_UPDATE_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+      processEvent(event);
+    }
+    public void afterInvalidate(EntryEvent event) {
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), 
CALLBACK_INVALIDATE));
+          this.callbackTypes.add(CALLBACK_INVALIDATE_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+    }
+    public void afterDestroy(EntryEvent event) {
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), 
CALLBACK_DESTROY));
+          this.callbackTypes.add(CALLBACK_DESTROY_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+    }
+    public void afterRegionInvalidate(RegionEvent event) {
+      synchronized(this.CONTROL_LOCK) {
+        if (event.getCallbackArgument() != null) {
+          this.callbackArguments.add(
+            new CallbackWrapper(event.getCallbackArgument(), 
CALLBACK_REGION_INVALIDATE));
+          this.callbackTypes.add(CALLBACK_REGION_INVALIDATE_INTEGER);
+          this.CONTROL_LOCK.notifyAll();
+        }
+      }
+    }
+    private void processEvent(EntryEvent event) {
+      if (event.getKey().equals(KEY_SLEEP)) {
+        processSleep(event);
+      }
+      else if (event.getKey().equals(KEY_WAIT)) {
+        processWait(event);
+      }
+      else if (event.getKey().equals(KEY_DISCONNECT)) {
+        processDisconnect(event);
+      }
+    }
+    private void processSleep(EntryEvent event) {
+      int sleepMs = ((Integer)event.getNewValue()).intValue();
+      getLogWriter().info("[processSleep] sleeping for " + sleepMs);
+      try {
+        Thread.sleep(sleepMs);
+      } catch (InterruptedException ignore) {fail("interrupted");}
+    }
+    private void processWait(EntryEvent event) {
+      int sleepMs = ((Integer)event.getNewValue()).intValue();
+      getLogWriter().info("[processWait] waiting for " + sleepMs);
+      synchronized(this.CONTROL_LOCK) {
+        try {
+          this.CONTROL_LOCK.wait(sleepMs);
+        } catch (InterruptedException ignore) {return;}
+      }
+    }
+    private void processDisconnect(EntryEvent event) {
+      getLogWriter().info("[processDisconnect] disconnecting");
+      disconnectFromDS();
+    }
+  };
+
+  /**
+   * Make sure a multiple no ack regions conflate properly.
+   * [bruce] disabled when use of this dunit test class was reenabled in
+   * the splitbrainNov07 branch.  The class had been disabled since
+   * June 2006 r13222 in the trunk.  This test is failing because conflation
+   * isn't kicking in for some reason.
+   */
+  public void donottestMultipleRegionConflation() throws Throwable {
+    try {
+      doTestMultipleRegionConflation();
+    }
+    catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+    }
+    catch (Throwable t) {
+      getLogWriter().error("Encountered exception: ", t);
+      throw t;
+    }
+    finally {
+      // make sure other vm was notified even if test failed
+      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+        public void run() {
+          
synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+            
doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
+          }
+        }
+      });
+    }
+  }
+  protected static ControlListener doTestMultipleRegionConflation_R1_Listener;
+  protected static ControlListener doTestMultipleRegionConflation_R2_Listener;
+  private void doTestMultipleRegionConflation() throws Exception {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r1 = createRootRegion("slowrec1", factory.create());
+    final Region r2 = createRootRegion("slowrec2", factory.create());
+    
+    assertTrue(getSystem().isConnected());
+    assertNotNull(r1);
+    assertFalse(r1.isDestroyed());
+    assertNotNull(getCache());
+    assertNotNull(getCache().getRegion("slowrec1"));
+    assertNotNull(r2);
+    assertFalse(r2.isDestroyed());
+    assertNotNull(getCache());
+    assertNotNull(getCache().getRegion("slowrec2"));
+    
+    final DM dm = getSystem().getDistributionManager();
+    final Serializable controllerVM = dm.getDistributionManagerId();
+    final DMStats stats = dm.getStats();
+    final int millisToWait = 1000 * 60 * 5; // 5 minutes
+    
+    // set others before vm0 connects
+    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
+
+    // create receiver in vm0 with queuing enabled
+    final Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "5");
+    p.setProperty("async-queue-timeout", "86400000"); // max value
+    p.setProperty("async-max-queue-size", "1024"); // max value
+
+    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
+      public void run2() throws CacheException {
+        getSystem(p);
+        
+        DM dm = getSystem().getDistributionManager();
+        assertTrue(dm.getDistributionManagerIds().contains(controllerVM));
+        
+        AttributesFactory af = new AttributesFactory();
+        af.setScope(Scope.DISTRIBUTED_NO_ACK);
+        af.setDataPolicy(DataPolicy.REPLICATE);
+        
+        doTestMultipleRegionConflation_R1_Listener = new ControlListener();
+        af.setCacheListener(doTestMultipleRegionConflation_R1_Listener);
+        createRootRegion("slowrec1", af.create());
+        
+        doTestMultipleRegionConflation_R2_Listener = new ControlListener();
+        af.setCacheListener(doTestMultipleRegionConflation_R2_Listener);
+        createRootRegion("slowrec2", af.create());
+      }
+    });
+    
+    // put vm0 cache listener into wait
+    getLogWriter().info("[doTestMultipleRegionConflation] about to put vm0 
into wait");
+    r1.put(KEY_WAIT, new Integer(millisToWait));
+
+    // build up queue size
+    getLogWriter().info("[doTestMultipleRegionConflation] building up queue 
size...");
+    final Object key = "key";
+    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
+    final int VALUE_SIZE = socketBufferSize*3;
+    //final int VALUE_SIZE = 1024 * 1024  ; // 1 MB
+    final byte[] value = new byte[VALUE_SIZE];
+
+    int count = 0;
+    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
+      count++;
+      r1.put(key, value);
+    }
+    
+    getLogWriter().info("[doTestMultipleRegionConflation] After " + 
+      count + " puts of size " + VALUE_SIZE + 
+      " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
+
+    // put values that will be asserted
+    final Object key1 = "key1";
+    final Object key2 = "key2";
+    Object putKey = key1;
+    boolean flag = true;
+    for (int i = 0; i < 30; i++) {
+      if (i == 10) putKey = key2;
+      if (flag) {
+        if (i == 6) {
+          r1.invalidate(putKey, new Integer(i));
+        } else if (i == 24) {
+          r1.invalidateRegion(new Integer(i));
+        } else {
+          r1.put(putKey, value, new Integer(i));
+        }
+      } else {
+        if (i == 15) {
+          r2.destroy(putKey, new Integer(i));
+        } else {
+          r2.put(putKey, value, new Integer(i));
+        }
+      }
+      flag = !flag;
+    }
+    
+    // r1: key1, 0, create
+    // r1: key1, 4, update
+    // r1: key1, 6, invalidate
+    // r1: key1, 8, update
+    
+    // r1: key2, 10, create
+    // r1:       24, invalidateRegion
+    // r1: key2, 28, update
+
+    // r2: key1, 1, create
+    // r2: key1, 9, update
+    
+    // r2: key2, 11, create
+    // r2: key2, 13, update
+    // r2: key2, 15, destroy
+    // r2: key2, 17, create
+    // r2: key2, 29, update
+    
+    final int[] r1ExpectedArgs = new int[] { 0, 4, 6, 8, 10, 24, 28 }; 
+    final int[] r1ExpectedTypes = new int[] /* 0, 1, 2, 1, 0, 4, 1 */
+      { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_INVALIDATE, CALLBACK_UPDATE,
+        CALLBACK_CREATE, CALLBACK_REGION_INVALIDATE, CALLBACK_UPDATE }; 
+    
+    final int[] r2ExpectedArgs = new int[] { 1, 9, 11, 13, 15, 17, 29 };
+    final int[] r2ExpectedTypes = new int[] 
+      { CALLBACK_CREATE, CALLBACK_UPDATE, CALLBACK_CREATE, CALLBACK_UPDATE,
+        CALLBACK_DESTROY, CALLBACK_CREATE, CALLBACK_UPDATE }; 
+
+    // send notify to vm0
+    getLogWriter().info("[doTestMultipleRegionConflation] wake up vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+      public void run() {
+        synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+          doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.notifyAll();
+        }
+      }
+    });
+    
+    // wait for queue to be flushed
+    getLogWriter().info("[doTestMultipleRegionConflation] wait for vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
+      public void run() {
+        try {
+          
synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+            while 
(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size() < 
r1ExpectedArgs.length) {
+              
doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK.wait(millisToWait);
+            }
+          }
+          
synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
+            while 
(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size() < 
r2ExpectedArgs.length) {
+              
doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK.wait(millisToWait);
+            }
+          }
+        } catch (InterruptedException ignore) {fail("interrupted");}
+      }
+    });
+    
+    // assert values on both listeners
+    getLogWriter().info("[doTestMultipleRegionConflation] assert callback 
arguments");
+    getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
+      public void run() {
+        synchronized(doTestMultipleRegionConflation_R1_Listener.CONTROL_LOCK) {
+          
getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackArguments="
 + doTestMultipleRegionConflation_R1_Listener.callbackArguments);
+          
getLogWriter().info("doTestMultipleRegionConflation_R1_Listener.callbackTypes=" 
+ doTestMultipleRegionConflation_R1_Listener.callbackTypes);
+          
assertEquals(doTestMultipleRegionConflation_R1_Listener.callbackArguments.size(),
+                       
doTestMultipleRegionConflation_R1_Listener.callbackTypes.size());
+          int i = 0;
+          for (Iterator iter = 
doTestMultipleRegionConflation_R1_Listener.callbackArguments.iterator(); 
iter.hasNext();) {
+            CallbackWrapper wrapper = (CallbackWrapper) iter.next();
+            assertEquals(new Integer(r1ExpectedArgs[i]), 
+              wrapper.callbackArgument);
+            assertEquals(new Integer(r1ExpectedTypes[i]), 
+              doTestMultipleRegionConflation_R1_Listener.callbackTypes.get(i));
+            i++;
+          }
+        }
+        synchronized(doTestMultipleRegionConflation_R2_Listener.CONTROL_LOCK) {
+          
getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackArguments="
 + doTestMultipleRegionConflation_R2_Listener.callbackArguments);
+          
getLogWriter().info("doTestMultipleRegionConflation_R2_Listener.callbackTypes=" 
+ doTestMultipleRegionConflation_R2_Listener.callbackTypes);
+          
assertEquals(doTestMultipleRegionConflation_R2_Listener.callbackArguments.size(),
+                       
doTestMultipleRegionConflation_R2_Listener.callbackTypes.size());
+          int i = 0;
+          for (Iterator iter = 
doTestMultipleRegionConflation_R2_Listener.callbackArguments.iterator(); 
iter.hasNext();) {
+            CallbackWrapper wrapper = (CallbackWrapper) iter.next();
+            assertEquals(new Integer(r2ExpectedArgs[i]), 
+              wrapper.callbackArgument);
+            assertEquals(new Integer(r2ExpectedTypes[i]), 
+              doTestMultipleRegionConflation_R2_Listener.callbackTypes.get(i));
+            i++;
+          }
+        }
+      }
+    });
+  }
+
+  /**
+   * Make sure a disconnect causes queue memory to be released.
+   */
+  public void testDisconnectCleanup() throws Throwable {
+    try {
+      doTestDisconnectCleanup();
+    }
+    catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+    }
+    catch (Throwable t) {
+      getLogWriter().error("Encountered exception: ", t);
+      throw t;
+    }
+    finally {
+      // make sure other vm was notified even if test failed
+      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+        public void run() {
+          synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
+            doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
+          }
+        }
+      });
+    }
+  }
+  protected static ControlListener doTestDisconnectCleanup_Listener;
+  private void doTestDisconnectCleanup() throws Exception {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DM dm = getSystem().getDistributionManager();
+    final DMStats stats = dm.getStats();
+    // set others before vm0 connects
+    final Set others = dm.getOtherDistributionManagerIds();
+    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
+    final int initialQueues = stats.getAsyncQueues();
+
+    // create receiver in vm0 with queuing enabled
+    final Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", "5");
+    p.setProperty("async-queue-timeout", "86400000"); // max value
+    p.setProperty("async-max-queue-size", "1024"); // max value
+    
+    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
+      public void run2() throws CacheException {
+        getSystem(p);
+        AttributesFactory af = new AttributesFactory();
+        af.setScope(Scope.DISTRIBUTED_NO_ACK);
+        af.setDataPolicy(DataPolicy.REPLICATE);
+        
+        doTestDisconnectCleanup_Listener = new ControlListener();
+        af.setCacheListener(doTestDisconnectCleanup_Listener);
+        createRootRegion("slowrec", af.create());
+      }
+    });
+
+    // put vm0 cache listener into wait
+    getLogWriter().info("[testDisconnectCleanup] about to put vm0 into wait");
+    int millisToWait = 1000 * 60 * 5; // 5 minutes
+    r.put(KEY_WAIT, new Integer(millisToWait));
+    r.put(KEY_DISCONNECT, KEY_DISCONNECT);
+
+    // build up queue size
+    getLogWriter().info("[testDisconnectCleanup] building up queue size...");
+    final Object key = "key";
+    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
+    final int VALUE_SIZE = socketBufferSize*3;
+    //final int VALUE_SIZE = 1024 * 1024  ; // 1 MB
+    final byte[] value = new byte[VALUE_SIZE];
+
+    int count = 0;
+    final long abortMillis = System.currentTimeMillis() + millisToWait;
+    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
+      count++;
+      r.put(key, value);
+      assertFalse(System.currentTimeMillis() >= abortMillis);
+    }
+    
+    getLogWriter().info("[testDisconnectCleanup] After " + 
+      count + " puts of size " + VALUE_SIZE + 
+      " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
+
+    while (stats.getAsyncQueuedMsgs() < 10 ||
+           stats.getAsyncQueueSize() < VALUE_SIZE*10) {
+      count++;
+      r.put(key, value);
+      assertFalse(System.currentTimeMillis() >= abortMillis);
+    }
+    assertTrue(stats.getAsyncQueuedMsgs() >= 10);
+
+    while (stats.getAsyncQueues() < 1) {
+      pause(100);
+      assertFalse(System.currentTimeMillis() >= abortMillis);
+    }
+    
+    getLogWriter().info("[testDisconnectCleanup] After " + 
+      count + " puts of size " + VALUE_SIZE + " queue size has reached " + 
+      stats.getAsyncQueueSize() + " bytes and number of queues is " + 
+      stats.getAsyncQueues() + ".");
+
+    assertTrue(stats.getAsyncQueueSize() >= (VALUE_SIZE*5));
+    assertEquals(initialQueues+1, stats.getAsyncQueues());
+
+    // assert vm0 is still connected
+    assertTrue(dm.getOtherDistributionManagerIds().size() > others.size());
+    
+    // send notify to vm0
+    getLogWriter().info("[testDisconnectCleanup] wake up vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+      public void run() {
+        synchronized(doTestDisconnectCleanup_Listener.CONTROL_LOCK) {
+          doTestDisconnectCleanup_Listener.CONTROL_LOCK.notifyAll();
+        }
+      }
+    });
+    
+    // make sure we lost a connection to vm0
+    getLogWriter().info("[testDisconnectCleanup] wait for vm0 to disconnect");
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return dm.getOtherDistributionManagerIds().size() <= others.size();
+      }
+      public String description() {
+        return "waiting for disconnect";
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+    assertEquals(others, dm.getOtherDistributionManagerIds());
+    
+    // check free memory... perform wait loop with System.gc
+    getLogWriter().info("[testDisconnectCleanup] wait for queue cleanup");
+    ev = new WaitCriterion() {
+      public boolean done() {
+        if (stats.getAsyncQueues() <= initialQueues) {
+          return true;
+        }
+        Runtime.getRuntime().gc();
+        return false;
+      }
+      public String description() {
+        return "waiting for queue cleanup";
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, 2 * 1000, 200, true);
+//    getLogWriter().info("[testDisconnectCleanup] initialQueues=" + 
+//      initialQueues + " asyncQueues=" + stats.getAsyncQueues());
+    assertEquals(initialQueues, stats.getAsyncQueues());
+  }
+
+  /**
+   * Make sure a disconnect causes queue memory to be released.<p>
+     * [bruce] This test was disabled when the SlowRecDUnitTest was re-enabled
+   * in build.xml in the splitbrainNov07 branch.  It had been disabled since
+   * June 2006 due to hangs.  Some of the tests, like this one, still need
+   * work because the periodically (some quite often) fail.
+ */
+  public void donottestPartialMessage() throws Throwable {
+    try {
+      doTestPartialMessage();
+    }
+    catch (VirtualMachineError e) {
+      SystemFailure.initiateFailure(e);
+      throw e;
+    }
+    catch (Throwable t) {
+      getLogWriter().error("Encountered exception: ", t);
+      throw t;
+    }
+    finally {
+      // make sure other vm was notified even if test failed
+      getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+        public void run() {
+          synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+            doTestPartialMessage_Listener.CONTROL_LOCK.notifyAll();
+          }
+        }
+      });
+    }
+  }
+  protected static ControlListener doTestPartialMessage_Listener;
+  private void doTestPartialMessage() throws Exception {
+    final AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_NO_ACK);
+    factory.setEnableAsyncConflation(true);
+    final Region r = createRootRegion("slowrec", factory.create());
+    final DM dm = getSystem().getDistributionManager();
+    final DMStats stats = dm.getStats();
+    
+    // set others before vm0 connects
+//    final Set others = dm.getOtherDistributionManagerIds();
+    long initialQueuedMsgs = stats.getAsyncQueuedMsgs();
+//    int initialQueues = stats.getAsyncQueues();
+
+    // create receiver in vm0 with queuing enabled
+    final Properties p = new Properties();
+    p.setProperty("async-distribution-timeout", String.valueOf(1000*4)); // 4 
sec
+    p.setProperty("async-queue-timeout", "86400000"); // max value
+    p.setProperty("async-max-queue-size", "1024"); // max value
+    
+    getOtherVm().invoke(new CacheSerializableRunnable("Create other vm") {
+      public void run2() throws CacheException {
+        getSystem(p);
+        AttributesFactory af = new AttributesFactory();
+        af.setScope(Scope.DISTRIBUTED_NO_ACK);
+        af.setDataPolicy(DataPolicy.REPLICATE);
+        
+        doTestPartialMessage_Listener = new ControlListener();
+        af.setCacheListener(doTestPartialMessage_Listener);
+        createRootRegion("slowrec", af.create());
+      }
+    });
+
+    // put vm0 cache listener into wait
+    getLogWriter().info("[testPartialMessage] about to put vm0 into wait");
+    final int millisToWait = 1000 * 60 * 5; // 5 minutes
+    r.put(KEY_WAIT, new Integer(millisToWait));
+
+    // build up queue size
+    getLogWriter().info("[testPartialMessage] building up queue size...");
+    final Object key = "key";
+    final int socketBufferSize = getSystem().getConfig().getSocketBufferSize();
+    final int VALUE_SIZE = socketBufferSize*3;
+    //1024 * 20; // 20 KB
+    final byte[] value = new byte[VALUE_SIZE];
+
+    int count = 0;
+    while (stats.getAsyncQueuedMsgs() == initialQueuedMsgs) {
+      count++;
+      r.put(key, value, new Integer(count));
+    }
+    
+    final int partialId = count;
+    assertEquals(0, stats.getAsyncConflatedMsgs());
+    
+    getLogWriter().info("[testPartialMessage] After " + 
+      count + " puts of size " + VALUE_SIZE + 
+      " slowrec mode kicked in with queue size=" + stats.getAsyncQueueSize());
+
+    pause(2000);
+      
+    // conflate 10 times
+    while (stats.getAsyncConflatedMsgs() < 10) {
+      count++;
+      r.put(key, value, new Integer(count));
+      if (count == partialId+1) {
+//        long begin = System.currentTimeMillis();
+//        while (stats.getAsyncQueues() < 1) {
+//          pause(100);
+//          assertFalse(System.currentTimeMillis() > begin+1000*10);
+//        }
+        assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
+        assertEquals(0, stats.getAsyncConflatedMsgs());
+      } else if (count == partialId+2) {
+        assertEquals(initialQueuedMsgs+2, stats.getAsyncQueuedMsgs());
+        assertEquals(1, stats.getAsyncConflatedMsgs());
+      }
+    }
+    
+    final int conflateId = count;
+    
+    final int[] expectedArgs = { partialId, conflateId };
+
+    // send notify to vm0
+    getLogWriter().info("[testPartialMessage] wake up vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wake up other vm") {
+      public void run() {
+        synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+          doTestPartialMessage_Listener.CONTROL_LOCK.notify();
+        }
+      }
+    });
+    
+    // wait for queue to be flushed
+    getLogWriter().info("[testPartialMessage] wait for vm0");
+    getOtherVm().invoke(new SerializableRunnable("Wait for other vm") {
+      public void run() {
+        try {
+          synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+            boolean done = false;
+            while (!done) {
+              if (doTestPartialMessage_Listener.callbackArguments.size()> 0) {
+                CallbackWrapper last = (CallbackWrapper)
+                  doTestPartialMessage_Listener.callbackArguments.getLast();
+                Integer lastId = (Integer) last.callbackArgument;
+                if (lastId.intValue() == conflateId) {
+                  done = true;
+                } else {
+                  
doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
+                }
+              } else {
+                doTestPartialMessage_Listener.CONTROL_LOCK.wait(millisToWait);
+              }
+            }
+          }
+        } catch (InterruptedException ignore) {fail("interrupted");}
+      }
+    });
+    
+    // assert values on both listeners
+    getLogWriter().info("[testPartialMessage] assert callback arguments");
+    getOtherVm().invoke(new SerializableRunnable("Assert callback arguments") {
+      public void run() {
+        synchronized(doTestPartialMessage_Listener.CONTROL_LOCK) {
+          getLogWriter().info("[testPartialMessage] " +
+              "doTestPartialMessage_Listener.callbackArguments=" + 
+              doTestPartialMessage_Listener.callbackArguments);
+              
+          assertEquals(doTestPartialMessage_Listener.callbackArguments.size(),
+                       doTestPartialMessage_Listener.callbackTypes.size());
+                       
+          int i = 0;
+          Iterator argIter = 
+            doTestPartialMessage_Listener.callbackArguments.iterator();
+          Iterator typeIter = 
+            doTestPartialMessage_Listener.callbackTypes.iterator();
+            
+          while (argIter.hasNext()) {
+            CallbackWrapper wrapper = (CallbackWrapper) argIter.next();
+            Integer arg = (Integer) wrapper.callbackArgument;
+            typeIter.next(); // Integer type
+            if (arg.intValue() < partialId) {
+              continue;
+            }
+            assertEquals(new Integer(expectedArgs[i]), arg);
+            //assertEquals(CALLBACK_UPDATE_INTEGER, type);
+            i++;
+          }
+        }
+      }
+    });
+    
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
index 41b3042..55a63ff 100755
--- 
a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/DistributedMemberDUnitTest.java
@@ -24,7 +24,11 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 
+import org.junit.experimental.categories.Category;
+
 import com.gemstone.gemfire.distributed.internal.membership.*;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
 import junit.framework.AssertionFailedError;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitDisabledTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitDisabledTest.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitDisabledTest.java
deleted file mode 100755
index 7b93a36..0000000
--- 
a/gemfire-core/src/test/java/com/gemstone/gemfire/distributed/internal/locks/CollaborationJUnitDisabledTest.java
+++ /dev/null
@@ -1,562 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.gemstone.gemfire.distributed.internal.locks;
-
-import junit.framework.*;
-import java.util.*;
-
-import com.gemstone.gemfire.CancelCriterion;
-import com.gemstone.gemfire.LogWriter;
-import com.gemstone.gemfire.SystemFailure;
-import com.gemstone.gemfire.internal.logging.LocalLogWriter;
-import com.gemstone.gemfire.internal.logging.InternalLogWriter;
-
-import dunit.DistributedTestCase;
-import dunit.DistributedTestCase.WaitCriterion;
-
-/**
- * Tests the Collaboration Lock used internally by dlock service.
- *
- * @author Kirk Lund
- * @since 4.1.1
- */
-public class CollaborationJUnitDisabledTest extends TestCase {
-
-  protected LogWriter log = new LocalLogWriter(InternalLogWriter.INFO_LEVEL);
-  protected Collaboration collaboration;
-  
-  public CollaborationJUnitDisabledTest(String name) {
-    super(name);
-  }
-  
-  public void setUp() throws Exception {
-    this.collaboration = new Collaboration(new CancelCriterion() {
-
-      public String cancelInProgress() {
-        // TODO Auto-generated method stub
-        return null;
-      }
-
-      public RuntimeException generateCancelledException(Throwable e) {
-        // TODO Auto-generated method stub
-        return null;
-      }
-      
-    });
-  }
-  
-  public void tearDown() throws Exception {
-    this.collaboration = null;
-  }
-  
-  protected volatile boolean flagTestBlocksUntilRelease = false;
-  protected volatile boolean threadBStartedTestBlocksUntilRelease = false;
-  public void testBlocksUntilRelease() throws Exception {
-    this.log.info("[testBlocksUntilRelease]");
-    Thread threadA = new Thread(group, new Runnable() {
-      public void run() {
-        collaboration.acquireUninterruptibly("topicA");
-        try {
-          flagTestBlocksUntilRelease = true;
-          while(flagTestBlocksUntilRelease) {
-            try {
-              Thread.sleep(10);
-            }
-            catch (InterruptedException ignore) {fail("interrupted");}
-          }
-        }
-        finally {
-          collaboration.release();
-        }
-      }
-    });
-    
-    // thread one acquires
-    threadA.start();
-    WaitCriterion ev = new WaitCriterion() {
-      public boolean done() {
-        return CollaborationJUnitDisabledTest.this.flagTestBlocksUntilRelease;
-      }
-      public String description() {
-        return "waiting for thread";
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 5 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadA));
-    
-    // thread two blocks until one releeases
-    Thread threadB = new Thread(group, new Runnable() {
-      public void run() {
-        threadBStartedTestBlocksUntilRelease = true;
-        collaboration.acquireUninterruptibly("topicB");
-        try {
-          flagTestBlocksUntilRelease = true;
-          WaitCriterion ev2 = new WaitCriterion() {
-            public boolean done() {
-              return !flagTestBlocksUntilRelease;
-            }
-            public String description() {
-              return "waiting for release";
-            }
-          };
-          DistributedTestCase.waitForCriterion(ev2, 20 * 1000, 200, true);
-        }
-        finally {
-          collaboration.release();
-        }
-      }
-    });
-    
-    // start up threadB
-    threadB.start();
-    ev = new WaitCriterion() {
-      public boolean done() {
-        return threadBStartedTestBlocksUntilRelease;
-      }
-      public String description() {
-        return "waiting for thread b";
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 5 * 1000, 200, true);
-    
-    // threadA holds topic and threadB is waiting...
-    assertTrue(this.collaboration.hasCurrentTopic(threadA));
-    assertFalse(this.collaboration.hasCurrentTopic(threadB));
-
-    // let threadA release so that threadB gets lock
-    this.flagTestBlocksUntilRelease = false;
-    DistributedTestCase.join(threadA, 30 * 1000, null);
-    
-    // make sure threadB is doing what it's supposed to do...
-    ev = new WaitCriterion() {
-      public boolean done() {
-        return flagTestBlocksUntilRelease;
-      }
-      public String description() {
-        return "threadB";
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 5 * 1000, 200, true);
-    // threadB must have lock now... let threadB release
-    assertTrue(this.collaboration.hasCurrentTopic(threadB));
-    this.flagTestBlocksUntilRelease = false;
-    DistributedTestCase.join(threadB, 30 * 1000, null);
-
-    // collaboration should be free now    
-    assertFalse(this.collaboration.hasCurrentTopic(threadA));
-    assertFalse(this.collaboration.hasCurrentTopic(threadB));
-    assertFalse(this.collaboration.hasCurrentTopic());
-  }
-  
-  protected volatile boolean threadAFlag_TestLateComerJoinsIn = false;
-  protected volatile boolean threadBFlag_TestLateComerJoinsIn = false;
-  protected volatile boolean threadCFlag_TestLateComerJoinsIn = true;
-  protected volatile boolean threadDFlag_TestLateComerJoinsIn = false;
-  public void testLateComerJoinsIn() throws Exception {
-    this.log.info("[testLateComerJoinsIn]");
-    
-    final Object topicA = "topicA";
-    final Object topicB = "topicB";
-    
-    // threads one and two acquire
-    Thread threadA = new Thread(group, new Runnable() {
-      public void run() {
-        collaboration.acquireUninterruptibly(topicA);
-        try {
-          threadAFlag_TestLateComerJoinsIn = true;
-          WaitCriterion ev = new WaitCriterion() {
-            public boolean done() {
-              return !threadAFlag_TestLateComerJoinsIn;
-            }
-            public String description() {
-              return null;
-            }
-          };
-          DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
-        }
-        finally {
-          collaboration.release();
-        }
-      }
-    });
-    threadA.start();
-    WaitCriterion ev = new WaitCriterion() {
-      public boolean done() {
-        return threadAFlag_TestLateComerJoinsIn;
-      }
-      public String description() {
-        return "wait for ThreadA";
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 30 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadA));
-    assertTrue(this.collaboration.isCurrentTopic(topicA));
-    
-    Thread threadB = new Thread(group, new Runnable() {
-      public void run() {
-        collaboration.acquireUninterruptibly(topicA);
-        try {
-          threadBFlag_TestLateComerJoinsIn = true;
-          WaitCriterion ev2 = new WaitCriterion() {
-            public boolean done() {
-              return !threadBFlag_TestLateComerJoinsIn;
-            }
-            public String description() {
-              return null;
-            }
-          };
-          DistributedTestCase.waitForCriterion(ev2, 60 * 1000, 200, true);
-        }
-        finally {
-          collaboration.release();
-        }
-      }
-    });
-    threadB.start();
-    ev = new WaitCriterion() {
-      public boolean done() {
-        return threadBFlag_TestLateComerJoinsIn;
-      }
-      public String description() {
-        return "";
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadB));
-    
-    // thread three blocks for new topic
-    Thread threadC = new Thread(group, new Runnable() {
-      public void run() {
-        threadCFlag_TestLateComerJoinsIn = false;
-        collaboration.acquireUninterruptibly(topicB);
-        try {
-          threadCFlag_TestLateComerJoinsIn = true;
-          WaitCriterion ev2 = new WaitCriterion() {
-            public boolean done() {
-              return !threadCFlag_TestLateComerJoinsIn;
-            }
-            public String description() {
-              return null;
-            }
-          };
-          DistributedTestCase.waitForCriterion(ev2, 60 * 1000, 200, true);
-        }
-        finally {
-          collaboration.release();
-        }
-      }
-    });
-    threadC.start();
-    ev = new WaitCriterion() {
-      public boolean done() {
-        return threadCFlag_TestLateComerJoinsIn;
-      }
-      public String description() {
-        return null;
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
-    assertFalse(this.collaboration.hasCurrentTopic(threadC));
-    assertFalse(this.collaboration.isCurrentTopic(topicB));
-    
-    // thread four (lateComer) acquires current topic immediately
-    Thread threadD = new Thread(group, new Runnable() {
-      public void run() {
-        collaboration.acquireUninterruptibly(topicA);
-        try {
-          threadDFlag_TestLateComerJoinsIn = true;
-          while(threadDFlag_TestLateComerJoinsIn) {
-            try {
-              Thread.sleep(10);
-            }
-            catch (InterruptedException ignore) {fail("interrupted");}
-          }
-        }
-        finally {
-          collaboration.release();
-        }
-      }
-    });
-    threadD.start();
-    ev = new WaitCriterion() {
-      public boolean done() {
-        return threadDFlag_TestLateComerJoinsIn;
-      }
-      public String description() {
-        return null;
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadD));
-    
-    // release threadA
-    this.threadAFlag_TestLateComerJoinsIn = false;
-    DistributedTestCase.join(threadA, 30 * 1000, null);
-    assertFalse(this.collaboration.hasCurrentTopic(threadA));
-    assertTrue(this.collaboration.hasCurrentTopic(threadB));
-    assertFalse(this.collaboration.hasCurrentTopic(threadC));
-    assertTrue(this.collaboration.hasCurrentTopic(threadD));
-    assertTrue(this.collaboration.isCurrentTopic(topicA));
-    assertFalse(this.collaboration.isCurrentTopic(topicB));
-    
-    // release threadB
-    this.threadBFlag_TestLateComerJoinsIn = false;
-    DistributedTestCase.join(threadB, 30 * 1000, null);
-    assertFalse(this.collaboration.hasCurrentTopic(threadB));
-    assertFalse(this.collaboration.hasCurrentTopic(threadC));
-    assertTrue(this.collaboration.hasCurrentTopic(threadD));
-    assertTrue(this.collaboration.isCurrentTopic(topicA));
-    assertFalse(this.collaboration.isCurrentTopic(topicB));
-    
-    // release threadD
-    this.threadDFlag_TestLateComerJoinsIn = false;
-    DistributedTestCase.join(threadD, 30 * 1000, null);
-    ev = new WaitCriterion() {
-      public boolean done() {
-        return threadCFlag_TestLateComerJoinsIn;
-      }
-      public String description() {
-        return null;
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
-    assertTrue(this.collaboration.hasCurrentTopic(threadC));
-    assertFalse(this.collaboration.hasCurrentTopic(threadD));
-    assertFalse(this.collaboration.isCurrentTopic(topicA));
-    assertTrue(this.collaboration.isCurrentTopic(topicB));
-    
-    // release threadC
-    this.threadCFlag_TestLateComerJoinsIn = false;
-    DistributedTestCase.join(threadC, 30 * 1000, null);
-    assertFalse(this.collaboration.hasCurrentTopic(threadC));
-    assertFalse(this.collaboration.isCurrentTopic(topicA));
-    assertFalse(this.collaboration.isCurrentTopic(topicB));
-  }
-  
-  protected List waitingList = Collections.synchronizedList(new ArrayList());
-  protected List fairnessList = Collections.synchronizedList(new ArrayList());
-  protected volatile boolean runTestFairnessStressfully = true;
-  public void testFairnessStressfully() throws Exception {
-    this.log.info("[testFairnessStressfully]");
-    final int numThreads = 20;
-    Thread threads[] = new Thread[numThreads];
-    
-    Runnable run = new Runnable() {
-      public void run() {
-        boolean released = false;
-        try {
-          String uniqueTopic = Thread.currentThread().getName();
-          while(runTestFairnessStressfully) {
-            waitingList.add(uniqueTopic);
-            collaboration.acquireUninterruptibly(uniqueTopic);
-            try {
-              released = false;
-              fairnessList.add(uniqueTopic);
-              waitingList.remove(uniqueTopic);
-            }
-            finally {
-              // wait for the other threads to line up...
-              WaitCriterion ev = new WaitCriterion() {
-                public boolean done() {
-                  return !runTestFairnessStressfully || waitingList.size() >= 
numThreads - 1;
-                }
-                public String description() {
-                  return "other threads lining up";
-                }
-              };
-              DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
-              collaboration.release();
-              released = true;
-            }
-          }
-        }
-        finally {
-          if (!released) {
-            collaboration.release();
-          }
-        }
-      }
-    };
-    
-    try {
-      // many threads loop: acquire and release with unique topic
-      for (int t = 0; t < threads.length; t++) {
-        threads[t] = new Thread(group, run, String.valueOf(t));
-        threads[t].start();
-      }
-
-      log.info("Started all threads... waiting for test to complete.");
-            
-      // wait for numThreads * 10
-      WaitCriterion ev = new WaitCriterion() {
-        public boolean done() {
-          return fairnessList.size() >= numThreads * 20;
-        }
-        public String description() {
-          return "waiting for numThreads * 10";
-        }
-      };
-      DistributedTestCase.waitForCriterion(ev, 5 * 60 * 1000, 200, true);
-    }
-    finally {
-      if (this.runTestFairnessStressfully) {
-        this.runTestFairnessStressfully = false;
-      }
-    }
-    
-    for (int t = 0; t < threads.length; t++) {
-      DistributedTestCase.join(threads[t], 30 * 1000, null);
-    }
-    
-    // assert that all topics are acquired in order
-    // count number of occurrences of each thread
-    int count[] = new int[numThreads];
-    for (int i = 0; i < count.length; i++) { // shouldn't be necessary
-      count[i] = 0;
-    }
-    synchronized(this.fairnessList) {
-      for (Iterator iter = this.fairnessList.iterator(); iter.hasNext();) {
-        int id = Integer.valueOf((String)iter.next()).intValue();
-        count[id] = count[id]+1;
-      }
-    }
-    
-    int totalLocks = 0;
-    int minLocks = Integer.MAX_VALUE;
-    int maxLocks = 0;
-    for (int i = 0; i < count.length; i++) {
-      int locks = count[i];
-      this.log.fine("testFairnessStressfully thread-" + i + " acquired topic " 
+ 
-        locks + " times.");
-      if (locks < minLocks) minLocks = locks;
-      if (locks > maxLocks) maxLocks = locks;
-      totalLocks = totalLocks + locks;
-    }
-
-    this.log.info("[testFairnessStressfully] totalLocks=" + totalLocks + 
-                  " minLocks=" + minLocks +
-                  " maxLocks=" + maxLocks);
-
-    int expectedLocks = (totalLocks / numThreads) + 1;
-    
-    // NOTE: if you turn on fine logs, this deviation may be too small...
-    // slower machines may also fail depending on thread scheduling
-    int deviation = (int)(expectedLocks * 0.25);
-    int lowThreshold = expectedLocks - deviation;
-    int highThreshold = expectedLocks + deviation;
-
-    this.log.info("[testFairnessStressfully] deviation=" + deviation +
-                  " expectedLocks=" + expectedLocks + 
-                  " lowThreshold=" + lowThreshold +
-                  " highThreshold=" + highThreshold);
-                        
-    // if these assertions keep failing we'll have to rewrite the test
-    // to handle scheduling of the threads...
-                  
-    assertTrue("minLocks is less than lowThreshold",
-               minLocks >= lowThreshold);
-    assertTrue("maxLocks is greater than highThreshold",
-               maxLocks <= highThreshold);
-  }
-  
-  public void testHasCurrentTopic() throws Exception {
-    this.log.info("[testHasCurrentTopic]");
-    assertTrue(!this.collaboration.hasCurrentTopic());
-    this.collaboration.acquireUninterruptibly("testHasCurrentTopic");
-    try {
-      assertTrue(this.collaboration.hasCurrentTopic());
-    }
-    finally {
-      this.collaboration.release();
-    }
-    assertTrue(!this.collaboration.hasCurrentTopic());
-  }
-  
-  protected volatile boolean flagTestThreadHasCurrentTopic = false;
-  public void testThreadHasCurrentTopic() throws Exception {
-    this.log.info("[testThreadHasCurrentTopic]");
-    Thread thread = new Thread(group, new Runnable() {
-      public void run() {
-        collaboration.acquireUninterruptibly("testThreadHasCurrentTopic");
-        try {
-          flagTestThreadHasCurrentTopic = true;
-          WaitCriterion ev = new WaitCriterion() {
-            public boolean done() {
-              return !flagTestThreadHasCurrentTopic;
-            }
-            public String description() {
-              return null;
-            }
-          };
-          DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
-        }
-        finally {
-          collaboration.release();
-        }
-      }
-    });
-    
-    // before starting thread, hasCurrentTopic(thread) returns false
-    assertTrue(!this.collaboration.hasCurrentTopic(thread));
-    thread.start();
-    WaitCriterion ev = new WaitCriterion() {
-      public boolean done() {
-        return flagTestThreadHasCurrentTopic;
-      }
-      public String description() {
-        return null;
-      }
-    };
-    DistributedTestCase.waitForCriterion(ev, 60 * 1000, 200, true);
-    
-    // after starting thread, hasCurrentTopic(thread) returns true
-    assertTrue(this.collaboration.hasCurrentTopic(thread));
-    this.flagTestThreadHasCurrentTopic = false;
-    DistributedTestCase.join(thread, 30 * 1000, null);
-    
-    // after thread finishes, hasCurrentTopic(thread) returns false
-    assertTrue(!this.collaboration.hasCurrentTopic(thread));
-  }
-  
-  public void testIsCurrentTopic() throws Exception {
-    this.log.info("[testIsCurrentTopic]");
-    Object topic = "testIsCurrentTopic";
-    assertTrue(!this.collaboration.isCurrentTopic(topic));
-    this.collaboration.acquireUninterruptibly(topic);
-    try {
-      assertTrue(this.collaboration.isCurrentTopic(topic));
-    }
-    finally {
-      this.collaboration.release();
-    }
-    assertTrue(!this.collaboration.isCurrentTopic(topic));
-  }
-
-  protected final ThreadGroup group = 
-      new ThreadGroup("CollaborationJUnitTest Threads") {
-        public void uncaughtException(Thread t, Throwable e)
-        {
-          if (e instanceof VirtualMachineError) {
-            SystemFailure.setFailure((VirtualMachineError)e); // don't throw
-          }
-          String s = "Uncaught exception in thread " + t;
-          log.error(s, e);
-          fail(s);
-        }
-      };
-}
-

Reply via email to