http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java
new file mode 100755
index 0000000..481863c
--- /dev/null
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/HAInterestTestCase.java
@@ -0,0 +1,1018 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache.tier.sockets;
+
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.MirrorType;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionAttributes;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.internal.Connection;
+import com.gemstone.gemfire.cache.client.internal.PoolImpl;
+import com.gemstone.gemfire.cache.client.internal.RegisterInterestTracker;
+import com.gemstone.gemfire.cache.client.internal.ServerRegionProxy;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.distributed.internal.ServerLocation;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverAdapter;
+import com.gemstone.gemfire.internal.cache.ClientServerObserverHolder;
+import com.gemstone.gemfire.internal.cache.CacheServerImpl;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.tier.InterestType;
+import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
+
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.VM;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.experimental.categories.Category;
+
+/**
+ * Tests Interest Registration Functionality
+ */
+@SuppressWarnings({"deprecation", "rawtypes", "serial", "unchecked"})
+public class HAInterestTestCase extends DistributedTestCase {
+  
+  protected static final int TIMEOUT_MILLIS = 60 * 1000;
+  protected static final int INTERVAL_MILLIS = 10;
+  
+  protected static final String REGION_NAME = "HAInterestBaseTest_region";
+  
+  protected static final String k1 = "k1";
+  protected static final String k2 = "k2";
+  protected static final String client_k1 = "client-k1";
+  protected static final String client_k2 = "client-k2";
+  protected static final String server_k1 = "server-k1";
+  protected static final String server_k2 = "server-k2";
+  protected static final String server_k1_updated = "server_k1_updated";
+
+  protected static Cache cache = null;
+  protected static PoolImpl pool = null;
+  protected static Connection conn = null;
+
+  protected static int PORT1;
+  protected static int PORT2;
+  protected static int PORT3;
+
+  protected static boolean isBeforeRegistrationCallbackCalled = false;
+  protected static boolean isBeforeInterestRecoveryCallbackCalled = false;
+  protected static boolean isAfterRegistrationCallbackCalled = false;
+
+  protected static Host host = null;
+  protected static VM server1 = null;
+  protected static VM server2 = null;
+  protected static VM server3 = null;
+  
+  protected volatile static boolean exceptionOccured = false;
+
+  public HAInterestTestCase(String name) {
+    super(name);
+  }
+
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    host = Host.getHost(0);
+    server1 = host.getVM(0);
+    server2 = host.getVM(1);
+    server3 = host.getVM(2);
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    // start servers first
+    PORT1 = ((Integer) server1.invoke(HAInterestTestCase.class, 
"createServerCache")).intValue();
+    PORT2 = ((Integer) server2.invoke(HAInterestTestCase.class, 
"createServerCache")).intValue();
+    PORT3 = ((Integer) server3.invoke(HAInterestTestCase.class, 
"createServerCache")).intValue();
+    exceptionOccured = false;
+    addExpectedException("java.net.ConnectException: Connection refused: 
connect");
+  }
+
+  @Override
+  public void tearDown2() throws Exception {
+    // close the clients first
+    closeCache();
+
+    // then close the servers
+    server1.invoke(HAInterestTestCase.class, "closeCache");
+    server2.invoke(HAInterestTestCase.class, "closeCache");
+    server3.invoke(HAInterestTestCase.class, "closeCache");
+    CacheServerTestUtil.resetDisableShufflingOfEndpointsFlag();
+  }
+
+  public static void closeCache() {
+    PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
+    PoolImpl.BEFORE_PRIMARY_IDENTIFICATION_FROM_BACKUP_CALLBACK_FLAG = false;
+    PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
+    PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
+    HAInterestTestCase.isAfterRegistrationCallbackCalled = false;
+    HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = false;
+    HAInterestTestCase.isBeforeRegistrationCallbackCalled = false;
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache.getDistributedSystem().disconnect();
+    }
+    cache = null;
+    pool = null;
+    conn = null;
+  }
+  
+  /**
+   * Return the current primary waiting for a primary to exist.
+   * 
+   * @since 5.7
+   */
+  public static VM getPrimaryVM() {
+    return getPrimaryVM(null);
+  }
+
+  /**
+   * Return the current primary waiting for a primary to exist and for it not 
to
+   * be the oldPrimary (if oldPrimary is NOT null).
+   * 
+   * @since 5.7
+   */
+  public static VM getPrimaryVM(final VM oldPrimary) {
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        int primaryPort = pool.getPrimaryPort();
+        if (primaryPort == -1) {
+          return false;
+        }
+        // we have a primary
+        VM currentPrimary = getServerVM(primaryPort);
+        if (currentPrimary != oldPrimary) {
+          return true;
+        }
+        return false;
+      }
+      @Override
+      public String description() {
+        return "waiting for primary";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    int primaryPort = pool.getPrimaryPort();
+    assertTrue(primaryPort != -1);
+    VM currentPrimary = getServerVM(primaryPort);
+    assertTrue(currentPrimary != oldPrimary);
+    return currentPrimary;
+  }
+
+  public static VM getBackupVM() {
+    return getBackupVM(null);
+  }
+
+  public static VM getBackupVM(VM stoppedBackup) {
+    VM currentPrimary = getPrimaryVM(null);
+    if (currentPrimary != server2 && server2 != stoppedBackup) {
+      return server2;
+    } else if (currentPrimary != server3 && server3 != stoppedBackup) {
+      return server3;
+    } else if (currentPrimary != server1 && server1 != stoppedBackup) {
+      return server1;
+    } else {
+      fail("expected currentPrimary " + currentPrimary + " to be " + server1 + 
", or " + server2 + ", or " + server3);
+      return null;
+    }
+  }
+
+  /**
+   * Given a server vm (server1, server2, or server3) return its port.
+   * 
+   * @since 5.7
+   */
+  public static int getServerPort(VM vm) {
+    if (vm == server1) {
+      return PORT1;
+    } else if (vm == server2) {
+      return PORT2;
+    } else if (vm == server3) {
+      return PORT3;
+    } else {
+      fail("expected vm " + vm + " to be " + server1 + ", or " + server2 + ", 
or " + server3);
+      return -1;
+    }
+  }
+
+  /**
+   * Given a server port (PORT1, PORT2, or PORT3) return its vm.
+   * 
+   * @since 5.7
+   */
+  public static VM getServerVM(int port) {
+    if (port == PORT1) {
+      return server1;
+    } else if (port == PORT2) {
+      return server2;
+    } else if (port == PORT3) {
+      return server3;
+    } else {
+      fail("expected port " + port + " to be " + PORT1 + ", or " + PORT2 + ", 
or " + PORT3);
+      return null;
+    }
+  }
+
+  public static void verifyRefreshedEntriesFromServer() {
+    final Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+    assertNotNull(r1);
+
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        Region.Entry re = r1.getEntry(k1);
+        if (re == null)
+          return false;
+        Object val = re.getValue();
+        return client_k1.equals(val);
+      }
+      @Override
+      public String description() {
+        return "waiting for client_k1 refresh from server";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        Region.Entry re = r1.getEntry(k2);
+        if (re == null)
+          return false;
+        Object val = re.getValue();
+        return client_k2.equals(val);
+      }
+      @Override
+      public String description() {
+        return "waiting for client_k2 refresh from server";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+  }
+
+  public static void verifyDeadAndLiveServers(final int expectedDeadServers, 
final int expectedLiveServers) {
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return pool.getConnectedServerCount() == expectedLiveServers;
+      }
+      @Override
+      public String description() {
+        return "waiting for pool.getConnectedServerCount() == 
expectedLiveServer";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+  }
+
+  public static void putK1andK2() {
+    Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+    assertNotNull(r1);
+    r1.put(k1, server_k1);
+    r1.put(k2, server_k2);
+  }
+
+  public static void setClientServerObserverForBeforeInterestRecoveryFailure() 
{
+    PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
+    ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
+      public void beforeInterestRecovery() {
+        synchronized (HAInterestTestCase.class) {
+          Thread t = new Thread() {
+            public void run() {
+              getBackupVM().invoke(HAInterestTestCase.class, "startServer");
+              getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer");
+            }
+          };
+          t.start();
+          try {
+            DistributedTestCase.join(t, 30 * 1000, getLogWriter());
+          } catch (Exception ignore) {
+            exceptionOccured = true;
+          }
+          HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true;
+          HAInterestTestCase.class.notify();
+          PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
+        }
+      }
+    });
+  }
+
+  public static void setClientServerObserverForBeforeInterestRecovery() {
+    PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = true;
+    ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
+      public void beforeInterestRecovery() {
+        synchronized (HAInterestTestCase.class) {
+          Thread t = new Thread() {
+            public void run() {
+              Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+              assertNotNull(r1);
+              r1.put(k1, server_k1_updated);
+            }
+          };
+          t.start();
+
+          HAInterestTestCase.isBeforeInterestRecoveryCallbackCalled = true;
+          HAInterestTestCase.class.notify();
+          PoolImpl.BEFORE_RECOVER_INTEREST_CALLBACK_FLAG = false;
+        }
+      }
+    });
+  }
+
+  public static void waitForBeforeInterestRecoveryCallBack() throws 
InterruptedException {
+    assertNotNull(cache);
+    synchronized (HAInterestTestCase.class) {
+      while (!isBeforeInterestRecoveryCallbackCalled) {
+        HAInterestTestCase.class.wait();
+      }
+    }
+  }
+
+  public static void setClientServerObserverForBeforeRegistration(final VM vm) 
{
+    PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = true;
+    ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
+      public void beforeInterestRegistration() {
+        synchronized (HAInterestTestCase.class) {
+          vm.invoke(HAInterestTestCase.class, "startServer");
+          HAInterestTestCase.isBeforeRegistrationCallbackCalled = true;
+          HAInterestTestCase.class.notify();
+          PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
+        }
+      }
+    });
+  }
+
+  public static void waitForBeforeRegistrationCallback() throws 
InterruptedException {
+    assertNotNull(cache);
+    synchronized (HAInterestTestCase.class) {
+      while (!isBeforeRegistrationCallbackCalled) {
+        HAInterestTestCase.class.wait();
+      }
+    }
+  }
+
+  public static void setClientServerObserverForAfterRegistration(final VM vm) {
+    PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = true;
+    ClientServerObserverHolder.setInstance(new ClientServerObserverAdapter() {
+      public void afterInterestRegistration() {
+        synchronized (HAInterestTestCase.class) {
+          vm.invoke(HAInterestTestCase.class, "startServer");
+          HAInterestTestCase.isAfterRegistrationCallbackCalled = true;
+          HAInterestTestCase.class.notify();
+          PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
+        }
+      }
+    });
+  }
+
+  public static void waitForAfterRegistrationCallback() throws 
InterruptedException {
+    assertNotNull(cache);
+    if (!isAfterRegistrationCallbackCalled) {
+      synchronized (HAInterestTestCase.class) {
+        while (!isAfterRegistrationCallbackCalled) {
+          HAInterestTestCase.class.wait();
+        }
+      }
+    }
+  }
+
+  public static void unSetClientServerObserverForRegistrationCallback() {
+    synchronized (HAInterestTestCase.class) {
+      PoolImpl.BEFORE_REGISTER_CALLBACK_FLAG = false;
+      PoolImpl.AFTER_REGISTER_CALLBACK_FLAG = false;
+      HAInterestTestCase.isBeforeRegistrationCallbackCalled = false;
+      HAInterestTestCase.isAfterRegistrationCallbackCalled = false;
+    }
+  }
+
+  public static void verifyDispatcherIsAlive() {
+    assertEquals("More than one BridgeServer", 1, 
cache.getCacheServers().size());
+    
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return cache.getCacheServers().size() == 1;
+      }
+      @Override
+      public String description() {
+        return "waiting for cache.getCacheServers().size() == 1";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    CacheServerImpl bs = (CacheServerImpl) 
cache.getCacheServers().iterator().next();
+    assertNotNull(bs);
+    assertNotNull(bs.getAcceptor());
+    assertNotNull(bs.getAcceptor().getCacheClientNotifier());
+    final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
+
+    wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return ccn.getClientProxies().size() > 0;
+      }
+      @Override
+      public String description() {
+        return "waiting for ccn.getClientProxies().size() > 0";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    wc = new WaitCriterion() {
+      Iterator iter_prox;
+      CacheClientProxy proxy;
+
+      @Override
+      public boolean done() {
+        iter_prox = ccn.getClientProxies().iterator();
+        if (iter_prox.hasNext()) {
+          proxy = (CacheClientProxy) iter_prox.next();
+          return proxy._messageDispatcher.isAlive();
+        } else {
+          return false;
+        }
+      }
+
+      @Override
+      public String description() {
+        return "waiting for CacheClientProxy _messageDispatcher to be alive";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+  }
+
+  public static void verifyDispatcherIsNotAlive() {
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return cache.getCacheServers().size() == 1;
+      }
+      @Override
+      public String description() {
+        return "cache.getCacheServers().size() == 1";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    CacheServerImpl bs = (CacheServerImpl) 
cache.getCacheServers().iterator().next();
+    assertNotNull(bs);
+    assertNotNull(bs.getAcceptor());
+    assertNotNull(bs.getAcceptor().getCacheClientNotifier());
+    final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
+    
+    wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return ccn.getClientProxies().size() > 0;
+      }
+      @Override
+      public String description() {
+        return "waiting for ccn.getClientProxies().size() > 0";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    Iterator iter_prox = ccn.getClientProxies().iterator();
+    if (iter_prox.hasNext()) {
+      CacheClientProxy proxy = (CacheClientProxy) iter_prox.next();
+      assertFalse("Dispatcher on secondary should not be alive", 
proxy._messageDispatcher.isAlive());
+    }
+  }
+
+  public static void createEntriesK1andK2OnServer() {
+    Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+    assertNotNull(r1);
+    if (!r1.containsKey(k1)) {
+      r1.create(k1, server_k1);
+    }
+    if (!r1.containsKey(k2)) {
+      r1.create(k2, server_k2);
+    }
+    assertEquals(r1.getEntry(k1).getValue(), server_k1);
+    assertEquals(r1.getEntry(k2).getValue(), server_k2);
+  }
+
+  public static void createEntriesK1andK2() {
+    Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+    assertNotNull(r1);
+    if (!r1.containsKey(k1)) {
+      r1.create(k1, client_k1);
+    }
+    if (!r1.containsKey(k2)) {
+      r1.create(k2, client_k2);
+    }
+    assertEquals(r1.getEntry(k1).getValue(), client_k1);
+    assertEquals(r1.getEntry(k2).getValue(), client_k2);
+  }
+
+  public static void createServerEntriesK1andK2() {
+    Region r1 = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+    assertNotNull(r1);
+    if (!r1.containsKey(k1)) {
+      r1.create(k1, server_k1);
+    }
+    if (!r1.containsKey(k2)) {
+      r1.create(k2, server_k2);
+    }
+    assertEquals(r1.getEntry(k1).getValue(), server_k1);
+    assertEquals(r1.getEntry(k2).getValue(), server_k2);
+  }
+
+  public static void registerK1AndK2() {
+    Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+    assertNotNull(r);
+    List list = new ArrayList();
+    list.add(k1);
+    list.add(k2);
+    r.registerInterest(list, InterestResultPolicy.KEYS_VALUES);
+  }
+
+  public static void reRegisterK1AndK2() {
+    Region r = cache.getRegion(Region.SEPARATOR + REGION_NAME);
+    assertNotNull(r);
+    List list = new ArrayList();
+    list.add(k1);
+    list.add(k2);
+    r.registerInterest(list);
+  }
+
+  public static void startServer() throws IOException {
+    Cache c = CacheFactory.getAnyInstance();
+    assertEquals("More than one BridgeServer", 1, c.getCacheServers().size());
+    CacheServerImpl bs = (CacheServerImpl) 
c.getCacheServers().iterator().next();
+    assertNotNull(bs);
+    bs.start();
+  }
+
+  public static void stopServer() {
+    assertEquals("More than one BridgeServer", 1, 
cache.getCacheServers().size());
+    CacheServerImpl bs = (CacheServerImpl) 
cache.getCacheServers().iterator().next();
+    assertNotNull(bs);
+    bs.stop();
+  }
+
+  public static void stopPrimaryAndRegisterK1AndK2AndVerifyResponse() {
+    LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + 
REGION_NAME);
+    assertNotNull(r);
+    ServerRegionProxy srp = new ServerRegionProxy(r);
+
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return pool.getConnectedServerCount() == 3;
+      }
+      @Override
+      public String description() {
+        return "connected server count never became 3";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    // close primaryEP
+    getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer");
+    List list = new ArrayList();
+    list.add(k1);
+    list.add(k2);
+    List serverKeys = srp.registerInterest(list, InterestType.KEY, 
InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
+    assertNotNull(serverKeys);
+    List resultKeys = (List) serverKeys.get(0);
+    assertEquals(2, resultKeys.size());
+    assertTrue(resultKeys.contains(k1));
+    assertTrue(resultKeys.contains(k2));
+  }
+
+  public static void stopPrimaryAndUnregisterRegisterK1() {
+    LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + 
REGION_NAME);
+    assertNotNull(r);
+    ServerRegionProxy srp = new ServerRegionProxy(r);
+
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return pool.getConnectedServerCount() == 3;
+      }
+      @Override
+      public String description() {
+        return "connected server count never became 3";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    // close primaryEP
+    getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer");
+    List list = new ArrayList();
+    list.add(k1);
+    srp.unregisterInterest(list, InterestType.KEY, false, false);
+  }
+
+  public static void 
stopBothPrimaryAndSecondaryAndRegisterK1AndK2AndVerifyResponse() {
+    LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + 
REGION_NAME);
+    assertNotNull(r);
+    ServerRegionProxy srp = new ServerRegionProxy(r);
+
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return pool.getConnectedServerCount() == 3;
+      }
+      @Override
+      public String description() {
+        return "connected server count never became 3";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    // close primaryEP
+    VM backup = getBackupVM();
+    getPrimaryVM().invoke(HAInterestTestCase.class, "stopServer");
+    // close secondary
+    backup.invoke(HAInterestTestCase.class, "stopServer");
+    List list = new ArrayList();
+    list.add(k1);
+    list.add(k2);
+    List serverKeys = srp.registerInterest(list, InterestType.KEY, 
InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
+
+    assertNotNull(serverKeys);
+    List resultKeys = (List) serverKeys.get(0);
+    assertEquals(2, resultKeys.size());
+    assertTrue(resultKeys.contains(k1));
+    assertTrue(resultKeys.contains(k2));
+  }
+
+  /**
+   * returns the secondary that was stopped
+   */
+  public static VM stopSecondaryAndRegisterK1AndK2AndVerifyResponse() {
+    LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + 
REGION_NAME);
+    assertNotNull(r);
+    ServerRegionProxy srp = new ServerRegionProxy(r);
+
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return pool.getConnectedServerCount() == 3;
+      }
+      @Override
+      public String description() {
+        return "Never got three connected servers";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    // close secondary EP
+    VM result = getBackupVM();
+    result.invoke(HAInterestTestCase.class, "stopServer");
+    List list = new ArrayList();
+    list.add(k1);
+    list.add(k2);
+    List serverKeys = srp.registerInterest(list, InterestType.KEY, 
InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
+
+    assertNotNull(serverKeys);
+    List resultKeys = (List) serverKeys.get(0);
+    assertEquals(2, resultKeys.size());
+    assertTrue(resultKeys.contains(k1));
+    assertTrue(resultKeys.contains(k2));
+    return result;
+  }
+
+  /**
+   * returns the secondary that was stopped
+   */
+  public static VM stopSecondaryAndUNregisterK1() {
+    LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + 
REGION_NAME);
+    assertNotNull(r);
+    ServerRegionProxy srp = new ServerRegionProxy(r);
+
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return pool.getConnectedServerCount() == 3;
+      }
+      @Override
+      public String description() {
+        return "connected server count never became 3";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    // close secondary EP
+    VM result = getBackupVM();
+    result.invoke(HAInterestTestCase.class, "stopServer");
+    List list = new ArrayList();
+    list.add(k1);
+    srp.unregisterInterest(list, InterestType.KEY, false, false);
+    return result;
+  }
+
+  public static void registerK1AndK2OnPrimaryAndSecondaryAndVerifyResponse() {
+    ServerLocation primary = pool.getPrimary();
+    ServerLocation secondary = (ServerLocation) pool.getRedundants().get(0);
+    LocalRegion r = (LocalRegion) cache.getRegion(Region.SEPARATOR + 
REGION_NAME);
+    assertNotNull(r);
+    ServerRegionProxy srp = new ServerRegionProxy(r);
+    List list = new ArrayList();
+    list.add(k1);
+    list.add(k2);
+
+    // Primary server
+    List serverKeys1 = srp.registerInterestOn(primary, list, InterestType.KEY, 
InterestResultPolicy.KEYS, false, r.getAttributes().getDataPolicy().ordinal);
+    assertNotNull(serverKeys1);
+    // expect serverKeys in response from primary
+    List resultKeys = (List) serverKeys1.get(0);
+    assertEquals(2, resultKeys.size());
+    assertTrue(resultKeys.contains(k1));
+    assertTrue(resultKeys.contains(k2));
+
+    // Secondary server
+    List serverKeys2 = srp.registerInterestOn(secondary, list, 
InterestType.KEY, InterestResultPolicy.KEYS, false, 
r.getAttributes().getDataPolicy().ordinal);
+    // if the list is null then it is empty
+    if (serverKeys2 != null) {
+      // no serverKeys in response from secondary
+      assertTrue(serverKeys2.isEmpty());
+    }
+  }
+
+  public static void verifyInterestRegistration() {
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return cache.getCacheServers().size() == 1;
+      }
+      @Override
+      public String description() {
+        return "waiting for cache.getCacheServers().size() == 1";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    CacheServerImpl bs = (CacheServerImpl) 
cache.getCacheServers().iterator().next();
+    assertNotNull(bs);
+    assertNotNull(bs.getAcceptor());
+    assertNotNull(bs.getAcceptor().getCacheClientNotifier());
+    final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
+    
+    wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return ccn.getClientProxies().size() > 0;
+      }
+      @Override
+      public String description() {
+        return "waiting for ccn.getClientProxies().size() > 0";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    Iterator iter_prox = ccn.getClientProxies().iterator();
+
+    if (iter_prox.hasNext()) {
+      final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
+      
+      wc = new WaitCriterion() {
+        @Override
+        public boolean done() {
+          Set keysMap = (Set) 
ccp.cils[RegisterInterestTracker.interestListIndex]
+              .getProfile(Region.SEPARATOR + REGION_NAME)
+              .getKeysOfInterestFor(ccp.getProxyID());
+          return keysMap != null && keysMap.size() == 2;
+        }
+        @Override
+        public String description() {
+          return "waiting for keys of interest to include 2 keys";
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, 
INTERVAL_MILLIS, true);
+
+      Set keysMap = (Set) 
ccp.cils[RegisterInterestTracker.interestListIndex].getProfile(Region.SEPARATOR 
+ REGION_NAME)
+          .getKeysOfInterestFor(ccp.getProxyID());
+      assertNotNull(keysMap);
+      assertEquals(2, keysMap.size());
+      assertTrue(keysMap.contains(k1));
+      assertTrue(keysMap.contains(k2));
+    }
+  }
+
+  public static void verifyInterestUNRegistration() {
+    WaitCriterion wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return cache.getCacheServers().size() == 1;
+      }
+      @Override
+      public String description() {
+        return "waiting for cache.getCacheServers().size() == 1";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    CacheServerImpl bs = (CacheServerImpl) 
cache.getCacheServers().iterator().next();
+    assertNotNull(bs);
+    assertNotNull(bs.getAcceptor());
+    assertNotNull(bs.getAcceptor().getCacheClientNotifier());
+    final CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier();
+    
+    wc = new WaitCriterion() {
+      @Override
+      public boolean done() {
+        return ccn.getClientProxies().size() > 0;
+      }
+      @Override
+      public String description() {
+        return "waiting for ccn.getClientProxies().size() > 0";
+      }
+    };
+    DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, INTERVAL_MILLIS, 
true);
+
+    Iterator iter_prox = ccn.getClientProxies().iterator();
+    if (iter_prox.hasNext()) {
+      final CacheClientProxy ccp = (CacheClientProxy) iter_prox.next();
+      
+      wc = new WaitCriterion() {
+        @Override
+        public boolean done() {
+          Set keysMap = (Set) 
ccp.cils[RegisterInterestTracker.interestListIndex]
+              .getProfile(Region.SEPARATOR + REGION_NAME)
+              .getKeysOfInterestFor(ccp.getProxyID());
+          return keysMap != null;
+        }
+        @Override
+        public String description() {
+          return "waiting for keys of interest to not be null";
+        }
+      };
+      DistributedTestCase.waitForCriterion(wc, TIMEOUT_MILLIS, 
INTERVAL_MILLIS, true);
+
+      Set keysMap = (Set) ccp.cils[RegisterInterestTracker.interestListIndex]
+          .getProfile(Region.SEPARATOR + REGION_NAME)
+          .getKeysOfInterestFor(ccp.getProxyID());
+      assertNotNull(keysMap);
+      assertEquals(1, keysMap.size());
+      assertFalse(keysMap.contains(k1));
+      assertTrue(keysMap.contains(k2));
+    }
+  }
+
+  private void createCache(Properties props) throws Exception {
+    DistributedSystem ds = getSystem(props);
+    assertNotNull(ds);
+    ds.disconnect();
+    ds = getSystem(props);
+    cache = CacheFactory.create(ds);
+    assertNotNull(cache);
+  }
+
+  public static void createClientPoolCache(String testName, String host) 
throws Exception {
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+    new HAInterestTestCase("temp").createCache(props);
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    PoolImpl p;
+    try {
+      p = (PoolImpl) PoolManager.createFactory()
+          .addServer(host, PORT1)
+          .addServer(host, PORT2)
+          .addServer(host, PORT3)
+          .setSubscriptionEnabled(true)
+          .setSubscriptionRedundancy(-1)
+          .setReadTimeout(1000)
+          .setPingInterval(1000)
+          // retryInterval should be more so that only registerInterste thread
+          // will initiate failover
+          // .setRetryInterval(20000)
+          .create("HAInterestBaseTestPool");
+    } finally {
+      CacheServerTestUtil.enableShufflingOfEndpoints();
+    }
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.LOCAL);
+    factory.setConcurrencyChecksEnabled(true);
+    factory.setPoolName(p.getName());
+
+    cache.createRegion(REGION_NAME, factory.create());
+    pool = p;
+    conn = pool.acquireConnection();
+    assertNotNull(conn);
+  }
+
+  public static void createClientPoolCacheWithSmallRetryInterval(String 
testName, String host) throws Exception {
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+    new HAInterestTestCase("temp").createCache(props);
+    CacheServerTestUtil.disableShufflingOfEndpoints();
+    PoolImpl p;
+    try {
+      p = (PoolImpl) PoolManager.createFactory()
+          .addServer(host, PORT1)
+          .addServer(host, PORT2)
+          .setSubscriptionEnabled(true)
+          .setSubscriptionRedundancy(-1)
+          .setReadTimeout(1000)
+          .setSocketBufferSize(32768)
+          .setMinConnections(6)
+          .setPingInterval(200)
+          // .setRetryInterval(200)
+          // retryAttempts 3
+          .create("HAInterestBaseTestPool");
+    } finally {
+      CacheServerTestUtil.enableShufflingOfEndpoints();
+    }
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.LOCAL);
+    factory.setConcurrencyChecksEnabled(true);
+    factory.setPoolName(p.getName());
+
+    cache.createRegion(REGION_NAME, factory.create());
+
+    pool = p;
+    conn = pool.acquireConnection();
+    assertNotNull(conn);
+  }
+
+  public static void createClientPoolCacheConnectionToSingleServer(String 
testName, String hostName) throws Exception {
+    Properties props = new Properties();
+    props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0");
+    props.setProperty(DistributionConfig.LOCATORS_NAME, "");
+    new HAInterestTestCase("temp").createCache(props);
+    PoolImpl p = (PoolImpl) PoolManager.createFactory()
+        .addServer(hostName, PORT1)
+        .setSubscriptionEnabled(true)
+        .setSubscriptionRedundancy(-1)
+        .setReadTimeout(1000)
+        // .setRetryInterval(20)
+        .create("HAInterestBaseTestPool");
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.LOCAL);
+    factory.setConcurrencyChecksEnabled(true);
+    factory.setPoolName(p.getName());
+
+    cache.createRegion(REGION_NAME, factory.create());
+
+    pool = p;
+    conn = pool.acquireConnection();
+    assertNotNull(conn);
+  }
+
+  public static Integer createServerCache() throws Exception {
+    new HAInterestTestCase("temp").createCache(new Properties());
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.DISTRIBUTED_ACK);
+    factory.setEnableBridgeConflation(true);
+    factory.setMirrorType(MirrorType.KEYS_VALUES);
+    factory.setConcurrencyChecksEnabled(true);
+    cache.createRegion(REGION_NAME, factory.create());
+
+    CacheServer server = cache.addCacheServer();
+    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    server.setPort(port);
+    server.setMaximumTimeBetweenPings(180000);
+    // ensures updates to be sent instead of invalidations
+    server.setNotifyBySubscription(true);
+    server.start();
+    return new Integer(server.getPort());
+  }
+
+  public static Integer createServerCacheWithLocalRegion() throws Exception {
+    new HAInterestTestCase("temp").createCache(new Properties());
+    AttributesFactory factory = new AttributesFactory();
+    factory.setScope(Scope.LOCAL);
+    factory.setConcurrencyChecksEnabled(true);
+    RegionAttributes attrs = factory.create();
+    cache.createRegion(REGION_NAME, attrs);
+
+    CacheServer server = cache.addCacheServer();
+    int port = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    server.setPort(port);
+    // ensures updates to be sent instead of invalidations
+    server.setNotifyBySubscription(true);
+    server.setMaximumTimeBetweenPings(180000);
+    server.start();
+    return new Integer(server.getPort());
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java
index b12f55b..b6bfe22 100644
--- 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/tier/sockets/command/CommitCommandTest.java
@@ -22,12 +22,18 @@ import static org.mockito.Mockito.when;
 import java.io.IOException;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.CancelCriterion;
 import com.gemstone.gemfire.cache.Cache;
 import com.gemstone.gemfire.internal.cache.tier.sockets.Message;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ServerConnection;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
+/**
+ * Exposes GEODE-537: NPE in JTA AFTER_COMPLETION command processing
+ */
+@Category(UnitTest.class)
 public class CommitCommandTest {
 
        /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java
index 77d7995..1f72a6b 100644
--- 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/LogWriterPerformanceTest.java
@@ -21,16 +21,22 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Properties;
 
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
+
 import com.gemstone.gemfire.LogWriter;
 import com.gemstone.gemfire.distributed.internal.DistributionConfig;
 import com.gemstone.gemfire.distributed.internal.DistributionConfigImpl;
 import com.gemstone.gemfire.internal.util.IOUtils;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
 
 /**
  * Tests performance of logging when level is OFF.
  * 
  * @author Kirk Lund
  */
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
 public class LogWriterPerformanceTest extends LoggingPerformanceTestCase {
 
   public LogWriterPerformanceTest(String name) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java
index f98868b..caedadc 100644
--- 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2DisabledPerformanceTest.java
@@ -20,7 +20,13 @@ import java.io.IOException;
 
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
 
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
+
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
 public class Log4J2DisabledPerformanceTest extends Log4J2PerformanceTest {
 
   public Log4J2DisabledPerformanceTest(String name) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java
index 6d75f80..1ad30bd 100644
--- 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/Log4J2PerformanceTest.java
@@ -28,11 +28,16 @@ import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.internal.FileUtil;
 import com.gemstone.gemfire.internal.logging.LoggingPerformanceTestCase;
 import com.gemstone.gemfire.internal.util.IOUtils;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
 
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
 public class Log4J2PerformanceTest extends LoggingPerformanceTestCase {
 
   protected static final int DEFAULT_LOG_FILE_SIZE_LIMIT = Integer.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java
index f964208..4be34c7 100644
--- 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerDisabledPerformanceTest.java
@@ -20,7 +20,13 @@ import java.io.IOException;
 
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
 
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
+
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
 public class LogWriterLoggerDisabledPerformanceTest extends 
LogWriterLoggerPerformanceTest {
 
   public LogWriterLoggerDisabledPerformanceTest(String name) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java
 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java
index 506e487..3d8ee46 100644
--- 
a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java
+++ 
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/logging/log4j/LogWriterLoggerPerformanceTest.java
@@ -27,11 +27,16 @@ import org.apache.commons.io.FileUtils;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.Logger;
 import org.apache.logging.log4j.core.config.ConfigurationFactory;
+import org.junit.Ignore;
+import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.internal.FileUtil;
 import com.gemstone.gemfire.internal.logging.LoggingPerformanceTestCase;
 import com.gemstone.gemfire.internal.util.IOUtils;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
 
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
 public class LogWriterLoggerPerformanceTest extends LoggingPerformanceTestCase 
{
 
   protected static final int DEFAULT_LOG_FILE_SIZE_LIMIT = Integer.MAX_VALUE;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/dunit/DistributedTestCase.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/DistributedTestCase.java 
b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
index a3d4785..6fa560f 100755
--- a/gemfire-core/src/test/java/dunit/DistributedTestCase.java
+++ b/gemfire-core/src/test/java/dunit/DistributedTestCase.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
+import org.junit.experimental.categories.Category;
 import org.springframework.data.gemfire.support.GemfireCache;
 
 import junit.framework.TestCase;
@@ -86,6 +87,7 @@ import com.gemstone.gemfire.internal.logging.LogWriterImpl;
 import com.gemstone.gemfire.internal.logging.ManagerLogWriter;
 import com.gemstone.gemfire.internal.logging.log4j.LogWriterLogger;
 import com.gemstone.gemfire.management.internal.cli.LogWrapper;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
 
 import dunit.standalone.DUnitLauncher;
 
@@ -101,6 +103,7 @@ import dunit.standalone.DUnitLauncher;
  *
  * @author David Whitlock
  */
+@Category(DistributedTest.class)
 @SuppressWarnings("serial")
 public abstract class DistributedTestCase extends TestCase implements 
java.io.Serializable {
   private static final Logger logger = LogService.getLogger();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java 
b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
index 7fc762f..6366853 100644
--- a/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
+++ b/gemfire-core/src/test/java/dunit/standalone/ProcessManager.java
@@ -38,6 +38,7 @@ import com.gemstone.gemfire.internal.FileUtil;
 import com.gemstone.gemfire.internal.logging.LogService;
 
 import dunit.RemoteDUnitVMIF;
+import dunit.standalone.ChildVM;
 
 /**
  * @author dsmith
@@ -183,7 +184,7 @@ public class ProcessManager {
       "-Dgemfire.disallowMcastDefaults=true",
       "-ea",
       agent,
-      "dunit.standalone.ChildVM"
+      ChildVM.class.getName()
     };
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
 
b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
new file mode 100755
index 0000000..8eec738
--- /dev/null
+++ 
b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/ContainerTest.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.test.junit.categories;
+/**
+ * JUnit Test Category that specifies a test executes within a container
+ * environment such as an OSGi server.
+ *  
+ * @author Kirk Lund
+ */
+public interface ContainerTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
 
b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
new file mode 100755
index 0000000..4fe535b
--- /dev/null
+++ 
b/gemfire-junit/src/test/java/com/gemstone/gemfire/test/junit/categories/HydraTest.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.test.junit.categories;
+/**
+ * JUnit Test Category that specifies a hydra test.
+ *  
+ * @author Kirk Lund
+ */
+public interface HydraTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java
 
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java
deleted file mode 100644
index ab2db78..0000000
--- 
a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplJUnitPerformanceTest.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *   http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package com.gemstone.gemfire.cache.lucene.internal.repository;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.lucene.analysis.standard.StandardAnalyzer;
-import org.apache.lucene.document.Document;
-import org.apache.lucene.document.Field.Store;
-import org.apache.lucene.document.TextField;
-import org.apache.lucene.index.IndexWriter;
-import org.apache.lucene.index.IndexWriterConfig;
-import org.apache.lucene.index.Term;
-import org.apache.lucene.search.IndexSearcher;
-import org.apache.lucene.search.Query;
-import org.apache.lucene.search.SearcherManager;
-import org.apache.lucene.search.TermQuery;
-import org.apache.lucene.store.RAMDirectory;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import com.gemstone.gemfire.DataSerializable;
-import com.gemstone.gemfire.DataSerializer;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.PartitionAttributesFactory;
-import com.gemstone.gemfire.cache.Region;
-import com.gemstone.gemfire.cache.RegionShortcut;
-import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
-import com.gemstone.gemfire.cache.lucene.LuceneIndex;
-import com.gemstone.gemfire.cache.lucene.LuceneQuery;
-import com.gemstone.gemfire.cache.lucene.LuceneQueryProvider;
-import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
-import com.gemstone.gemfire.cache.lucene.LuceneService;
-import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider;
-import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl;
-import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
-import 
com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntriesCollector;
-import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
-import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
-import 
com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
-import com.gemstone.gemfire.cache.query.QueryException;
-import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
-
-/**
- * Microbenchmark of the IndexRepository to compare an
- * IndexRepository built on top of cache with a 
- * stock lucene IndexWriter with a RAMDirectory.
- */
-@Category(PerformanceTest.class)
-public class IndexRepositoryImplJUnitPerformanceTest {
-  
-  private static final int NUM_WORDS = 1000;
-  private static int[] COMMIT_INTERVAL = new int[] {100, 1000, 5000};
-  private static int NUM_ENTRIES = 500_000;
-  private static int NUM_QUERIES = 500_000;
-
-  private StandardAnalyzer analyzer = new StandardAnalyzer();
-  
-  @Test
-  public  void testIndexRepository() throws Exception {
-    
-
-    doTest("IndexRepository", new TestCallbacks() {
-
-      private Cache cache;
-      private IndexRepositoryImpl repo;
-      private IndexWriter writer;
-
-      @Override
-      public void addObject(String key, String text) throws Exception {
-        repo.create(key, new TestObject(text));
-      }
-
-      @Override
-      public void commit()  throws Exception {
-        repo.commit();
-      }
-
-      @Override
-      public void init() throws Exception {
-        cache = new CacheFactory().set("mcast-port", "0")
-            .set("log-level", "error")
-            .create();
-        Region<String, File> fileRegion = cache.<String, 
File>createRegionFactory(RegionShortcut.REPLICATE).create("files");
-        Region<ChunkKey, byte[]> chunkRegion = cache.<ChunkKey, 
byte[]>createRegionFactory(RegionShortcut.REPLICATE).create("chunks");
-
-        RegionDirectory dir = new RegionDirectory(fileRegion, chunkRegion);
-        
-        
-        IndexWriterConfig config = new IndexWriterConfig(analyzer);
-        writer = new IndexWriter(dir, config);
-        String[] indexedFields= new String[] {"text"};
-        HeterogenousLuceneSerializer mapper = new 
HeterogenousLuceneSerializer(indexedFields);
-        repo = new IndexRepositoryImpl(fileRegion, writer, mapper);
-      }
-
-      @Override
-      public void cleanup() throws IOException {
-        writer.close();
-        cache.close();
-      }
-
-      @Override
-      public void waitForAsync() throws Exception {
-        //do nothing
-      }
-
-      @Override
-      public int query(Query query) throws IOException {
-        TopEntriesCollector collector = new TopEntriesCollector();
-        repo.query(query, 100, collector);
-        return collector.size();
-      }
-    });
-  }
-  
-  /**
-   * Test our full lucene index implementation
-   * @throws Exception
-   */
-  @Test
-  public void testLuceneIndex() throws Exception {
-    
-
-    doTest("LuceneIndex", new TestCallbacks() {
-
-      private Cache cache;
-      private Region<String, TestObject> region;
-      private LuceneService service;
-
-      @Override
-      public void addObject(String key, String text) throws Exception {
-        region.create(key, new TestObject(text));
-      }
-
-      @Override
-      public void commit()  throws Exception {
-        //NA
-      }
-
-      @Override
-      public void init() throws Exception {
-        cache = new CacheFactory().set("mcast-port", "0")
-            .set("log-level", "warning")
-            .create();
-        service = LuceneServiceProvider.get(cache);
-        service.createIndex("index", "/region", "text");
-        region = cache.<String, 
TestObject>createRegionFactory(RegionShortcut.PARTITION)
-            .setPartitionAttributes(new 
PartitionAttributesFactory<>().setTotalNumBuckets(1).create())
-            .create("region");
-      }
-
-      @Override
-      public void cleanup() throws IOException {
-        cache.close();
-      }
-      
-      @Override
-      public void waitForAsync() throws Exception {
-        AsyncEventQueue aeq = 
cache.getAsyncEventQueue(LuceneServiceImpl.getUniqueIndexName("index", 
"/region"));
-        
-        //We will be at most 10 ms off
-        while(aeq.size() > 0) {
-          Thread.sleep(10);
-        }
-      }
-
-      @Override
-      public int query(final Query query) throws Exception {
-        LuceneQuery<Object, Object> luceneQuery = 
service.createLuceneQueryFactory().create("index", "/region", new 
LuceneQueryProvider() {
-          
-          @Override
-          public Query getQuery(LuceneIndex index) throws QueryException {
-            return query;
-          }
-        });
-        
-        LuceneQueryResults<Object, Object> results = luceneQuery.search();
-        return results.size();
-      }
-    });
-  }
-  
-  @Test
-  public  void testLuceneWithRegionDirectory() throws Exception {
-    doTest("RegionDirectory", new TestCallbacks() {
-
-      private IndexWriter writer;
-      private SearcherManager searcherManager;
-
-      @Override
-      public void init() throws Exception {
-        RegionDirectory dir = new RegionDirectory(new 
ConcurrentHashMap<String, File>(), new ConcurrentHashMap<ChunkKey, byte[]>());
-        IndexWriterConfig config = new IndexWriterConfig(analyzer);
-        writer = new IndexWriter(dir, config);
-        searcherManager = new SearcherManager(writer, true, null);
-      }
-
-      @Override
-      public void addObject(String key, String text) throws Exception {
-        Document doc = new Document();
-        doc.add(new TextField("key", key, Store.YES));
-        doc.add(new TextField("text", text, Store.NO));
-        writer.addDocument(doc);
-      }
-
-      @Override
-      public void commit() throws Exception {
-        writer.commit();
-        searcherManager.maybeRefresh();
-      }
-
-      @Override
-      public void cleanup() throws Exception {
-        writer.close();
-      }
-      
-      @Override
-      public void waitForAsync() throws Exception {
-        //do nothing
-      }
-
-      @Override
-      public int query(Query query) throws Exception {
-        IndexSearcher searcher = searcherManager.acquire();
-        try {
-          return searcher.count(query);
-        } finally {
-          searcherManager.release(searcher);
-        }
-      }
-      
-    });
-    
-  }
-  
-  @Test
-  public  void testLucene() throws Exception {
-    doTest("Lucene", new TestCallbacks() {
-
-      private IndexWriter writer;
-      private SearcherManager searcherManager;
-
-      @Override
-      public void init() throws Exception {
-        RAMDirectory dir = new RAMDirectory();
-        IndexWriterConfig config = new IndexWriterConfig(analyzer);
-        writer = new IndexWriter(dir, config);
-        searcherManager = new SearcherManager(writer, true, null);
-      }
-
-      @Override
-      public void addObject(String key, String text) throws Exception {
-        Document doc = new Document();
-        doc.add(new TextField("key", key, Store.YES));
-        doc.add(new TextField("text", text, Store.NO));
-        writer.addDocument(doc);
-      }
-
-      @Override
-      public void commit() throws Exception {
-        writer.commit();
-        searcherManager.maybeRefresh();
-      }
-
-      @Override
-      public void cleanup() throws Exception {
-        writer.close();
-      }
-      
-      @Override
-      public void waitForAsync() throws Exception {
-        //do nothing
-      }
-
-      @Override
-      public int query(Query query) throws Exception {
-        IndexSearcher searcher = searcherManager.acquire();
-        try {
-          return searcher.count(query);
-        } finally {
-          searcherManager.release(searcher);
-        }
-      }
-      
-    });
-    
-  }
-  
-  private void doTest(String testName, TestCallbacks callbacks) throws 
Exception {
-
-    //Create some random words. We need to be careful
-    //to make sure we get NUM_WORDS distinct words here
-    Set<String> wordSet = new HashSet<String>();
-    Random rand = new Random();
-    while(wordSet.size() < NUM_WORDS) {
-      int length = rand.nextInt(12) + 3;
-      char[] text = new char[length];
-      for(int i = 0; i < length; i++) {
-        text[i] = (char) (rand.nextInt(26) + 97);
-      }
-      wordSet.add(new String(text));
-    }
-    List<String> words = new ArrayList<String>(wordSet.size());
-    words.addAll(wordSet);
-    
-    
-    
-    //warm up
-    writeRandomWords(callbacks, words, rand, NUM_ENTRIES / 10, NUM_QUERIES / 
10, COMMIT_INTERVAL[0]);
-    
-    //Do the actual test
-    
-    for(int i = 0; i < COMMIT_INTERVAL.length; i++) {
-      Results results = writeRandomWords(callbacks, words, rand, NUM_ENTRIES, 
NUM_QUERIES / 10, COMMIT_INTERVAL[i]);
-    
-      System.out.println(testName + " writes(entries=" + NUM_ENTRIES + ", 
commit=" + COMMIT_INTERVAL[i] + "): " + 
TimeUnit.NANOSECONDS.toMillis(results.writeTime));
-      System.out.println(testName + " queries(entries=" + NUM_ENTRIES + ", 
commit=" + COMMIT_INTERVAL[i] + "): " + 
TimeUnit.NANOSECONDS.toMillis(results.queryTime));
-    }
-  }
-
-  private Results writeRandomWords(TestCallbacks callbacks, List<String> words,
-      Random rand, int numEntries, int numQueries, int commitInterval) throws 
Exception {
-    Results results  = new Results();
-    callbacks.init();
-    int[] counts = new int[words.size()];
-    long start = System.nanoTime();
-    try {
-      for(int i =0; i < numEntries; i++) {
-        int word1 = rand.nextInt(words.size());
-        int word2 = rand.nextInt(words.size());
-        counts[word1]++;
-        counts[word2]++;
-        String value = words.get(word1) + " " + words.get(word2);
-        callbacks.addObject("key" + i, value);
-
-        if(i % commitInterval == 0 && i != 0) {
-          callbacks.commit();
-        }
-      }
-      callbacks.commit();
-      callbacks.waitForAsync();
-      long end = System.nanoTime();
-      results.writeTime = end - start;
-      
-      
-      start = System.nanoTime();
-      for(int i=0; i < numQueries; i++) {
-        int wordIndex = rand.nextInt(words.size());
-        String word = words.get(wordIndex);
-        Query query = new TermQuery(new Term("text", word));
-        int size  = callbacks.query(query);
-//        int size  = callbacks.query(parser.parse(word));
-        //All of my tests sometimes seem to be missing a couple of words, 
including the stock lucene
-//        assertEquals("Error on query " + i + " word=" + word, 
counts[wordIndex], size);
-      }
-      end = System.nanoTime();
-      results.queryTime = end - start;
-      
-      return results;
-    } finally {
-      callbacks.cleanup();
-    }
-  }
-
-  private static class TestObject implements DataSerializable {
-    private String text;
-
-    public TestObject() {
-      
-    }
-    
-    public TestObject(String text) {
-      super();
-      this.text = text;
-    }
-
-    @Override
-    public void toData(DataOutput out) throws IOException {
-      DataSerializer.writeString(text, out);
-    }
-
-    @Override
-    public void fromData(DataInput in)
-        throws IOException, ClassNotFoundException {
-      text = DataSerializer.readString(in);
-    }
-
-    @Override
-    public String toString() {
-      return text;
-    }
-    
-    
-  }
-  
-  private interface TestCallbacks {
-    public void init() throws Exception;
-    public int query(Query query) throws Exception;
-    public void addObject(String key, String text)  throws Exception;
-    public void commit() throws Exception;
-    public void waitForAsync() throws Exception;
-    public void cleanup() throws Exception;
-  }
-  
-  private static class Results {
-    long writeTime;
-    long queryTime;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/c5efb805/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
----------------------------------------------------------------------
diff --git 
a/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
 
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
new file mode 100644
index 0000000..74f3742
--- /dev/null
+++ 
b/gemfire-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/repository/IndexRepositoryImplPerformanceTest.java
@@ -0,0 +1,439 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.gemstone.gemfire.cache.lucene.internal.repository;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.lucene.analysis.standard.StandardAnalyzer;
+import org.apache.lucene.document.Document;
+import org.apache.lucene.document.Field.Store;
+import org.apache.lucene.document.TextField;
+import org.apache.lucene.index.IndexWriter;
+import org.apache.lucene.index.IndexWriterConfig;
+import org.apache.lucene.index.Term;
+import org.apache.lucene.search.IndexSearcher;
+import org.apache.lucene.search.Query;
+import org.apache.lucene.search.SearcherManager;
+import org.apache.lucene.search.TermQuery;
+import org.apache.lucene.store.RAMDirectory;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue;
+import com.gemstone.gemfire.cache.lucene.LuceneIndex;
+import com.gemstone.gemfire.cache.lucene.LuceneQuery;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryProvider;
+import com.gemstone.gemfire.cache.lucene.LuceneQueryResults;
+import com.gemstone.gemfire.cache.lucene.LuceneService;
+import com.gemstone.gemfire.cache.lucene.LuceneServiceProvider;
+import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl;
+import com.gemstone.gemfire.cache.lucene.internal.directory.RegionDirectory;
+import 
com.gemstone.gemfire.cache.lucene.internal.distributed.TopEntriesCollector;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey;
+import com.gemstone.gemfire.cache.lucene.internal.filesystem.File;
+import 
com.gemstone.gemfire.cache.lucene.internal.repository.serializer.HeterogenousLuceneSerializer;
+import com.gemstone.gemfire.cache.query.QueryException;
+import com.gemstone.gemfire.test.junit.categories.PerformanceTest;
+
+/**
+ * Microbenchmark of the IndexRepository to compare an
+ * IndexRepository built on top of cache with a 
+ * stock lucene IndexWriter with a RAMDirectory.
+ */
+@Category(PerformanceTest.class)
+@Ignore("Tests have no assertions")
+public class IndexRepositoryImplPerformanceTest {
+  
+  private static final int NUM_WORDS = 1000;
+  private static int[] COMMIT_INTERVAL = new int[] {100, 1000, 5000};
+  private static int NUM_ENTRIES = 500_000;
+  private static int NUM_QUERIES = 500_000;
+
+  private StandardAnalyzer analyzer = new StandardAnalyzer();
+  
+  @Test
+  public  void testIndexRepository() throws Exception {
+    
+
+    doTest("IndexRepository", new TestCallbacks() {
+
+      private Cache cache;
+      private IndexRepositoryImpl repo;
+      private IndexWriter writer;
+
+      @Override
+      public void addObject(String key, String text) throws Exception {
+        repo.create(key, new TestObject(text));
+      }
+
+      @Override
+      public void commit()  throws Exception {
+        repo.commit();
+      }
+
+      @Override
+      public void init() throws Exception {
+        cache = new CacheFactory().set("mcast-port", "0")
+            .set("log-level", "error")
+            .create();
+        Region<String, File> fileRegion = cache.<String, 
File>createRegionFactory(RegionShortcut.REPLICATE).create("files");
+        Region<ChunkKey, byte[]> chunkRegion = cache.<ChunkKey, 
byte[]>createRegionFactory(RegionShortcut.REPLICATE).create("chunks");
+
+        RegionDirectory dir = new RegionDirectory(fileRegion, chunkRegion);
+        
+        
+        IndexWriterConfig config = new IndexWriterConfig(analyzer);
+        writer = new IndexWriter(dir, config);
+        String[] indexedFields= new String[] {"text"};
+        HeterogenousLuceneSerializer mapper = new 
HeterogenousLuceneSerializer(indexedFields);
+        repo = new IndexRepositoryImpl(fileRegion, writer, mapper);
+      }
+
+      @Override
+      public void cleanup() throws IOException {
+        writer.close();
+        cache.close();
+      }
+
+      @Override
+      public void waitForAsync() throws Exception {
+        //do nothing
+      }
+
+      @Override
+      public int query(Query query) throws IOException {
+        TopEntriesCollector collector = new TopEntriesCollector();
+        repo.query(query, 100, collector);
+        return collector.size();
+      }
+    });
+  }
+  
+  /**
+   * Test our full lucene index implementation
+   * @throws Exception
+   */
+  @Test
+  public void testLuceneIndex() throws Exception {
+    
+
+    doTest("LuceneIndex", new TestCallbacks() {
+
+      private Cache cache;
+      private Region<String, TestObject> region;
+      private LuceneService service;
+
+      @Override
+      public void addObject(String key, String text) throws Exception {
+        region.create(key, new TestObject(text));
+      }
+
+      @Override
+      public void commit()  throws Exception {
+        //NA
+      }
+
+      @Override
+      public void init() throws Exception {
+        cache = new CacheFactory().set("mcast-port", "0")
+            .set("log-level", "warning")
+            .create();
+        service = LuceneServiceProvider.get(cache);
+        service.createIndex("index", "/region", "text");
+        region = cache.<String, 
TestObject>createRegionFactory(RegionShortcut.PARTITION)
+            .setPartitionAttributes(new 
PartitionAttributesFactory<>().setTotalNumBuckets(1).create())
+            .create("region");
+      }
+
+      @Override
+      public void cleanup() throws IOException {
+        cache.close();
+      }
+      
+      @Override
+      public void waitForAsync() throws Exception {
+        AsyncEventQueue aeq = 
cache.getAsyncEventQueue(LuceneServiceImpl.getUniqueIndexName("index", 
"/region"));
+        
+        //We will be at most 10 ms off
+        while(aeq.size() > 0) {
+          Thread.sleep(10);
+        }
+      }
+
+      @Override
+      public int query(final Query query) throws Exception {
+        LuceneQuery<Object, Object> luceneQuery = 
service.createLuceneQueryFactory().create("index", "/region", new 
LuceneQueryProvider() {
+          
+          @Override
+          public Query getQuery(LuceneIndex index) throws QueryException {
+            return query;
+          }
+        });
+        
+        LuceneQueryResults<Object, Object> results = luceneQuery.search();
+        return results.size();
+      }
+    });
+  }
+  
+  @Test
+  public  void testLuceneWithRegionDirectory() throws Exception {
+    doTest("RegionDirectory", new TestCallbacks() {
+
+      private IndexWriter writer;
+      private SearcherManager searcherManager;
+
+      @Override
+      public void init() throws Exception {
+        RegionDirectory dir = new RegionDirectory(new 
ConcurrentHashMap<String, File>(), new ConcurrentHashMap<ChunkKey, byte[]>());
+        IndexWriterConfig config = new IndexWriterConfig(analyzer);
+        writer = new IndexWriter(dir, config);
+        searcherManager = new SearcherManager(writer, true, null);
+      }
+
+      @Override
+      public void addObject(String key, String text) throws Exception {
+        Document doc = new Document();
+        doc.add(new TextField("key", key, Store.YES));
+        doc.add(new TextField("text", text, Store.NO));
+        writer.addDocument(doc);
+      }
+
+      @Override
+      public void commit() throws Exception {
+        writer.commit();
+        searcherManager.maybeRefresh();
+      }
+
+      @Override
+      public void cleanup() throws Exception {
+        writer.close();
+      }
+      
+      @Override
+      public void waitForAsync() throws Exception {
+        //do nothing
+      }
+
+      @Override
+      public int query(Query query) throws Exception {
+        IndexSearcher searcher = searcherManager.acquire();
+        try {
+          return searcher.count(query);
+        } finally {
+          searcherManager.release(searcher);
+        }
+      }
+      
+    });
+    
+  }
+  
+  @Test
+  public  void testLucene() throws Exception {
+    doTest("Lucene", new TestCallbacks() {
+
+      private IndexWriter writer;
+      private SearcherManager searcherManager;
+
+      @Override
+      public void init() throws Exception {
+        RAMDirectory dir = new RAMDirectory();
+        IndexWriterConfig config = new IndexWriterConfig(analyzer);
+        writer = new IndexWriter(dir, config);
+        searcherManager = new SearcherManager(writer, true, null);
+      }
+
+      @Override
+      public void addObject(String key, String text) throws Exception {
+        Document doc = new Document();
+        doc.add(new TextField("key", key, Store.YES));
+        doc.add(new TextField("text", text, Store.NO));
+        writer.addDocument(doc);
+      }
+
+      @Override
+      public void commit() throws Exception {
+        writer.commit();
+        searcherManager.maybeRefresh();
+      }
+
+      @Override
+      public void cleanup() throws Exception {
+        writer.close();
+      }
+      
+      @Override
+      public void waitForAsync() throws Exception {
+        //do nothing
+      }
+
+      @Override
+      public int query(Query query) throws Exception {
+        IndexSearcher searcher = searcherManager.acquire();
+        try {
+          return searcher.count(query);
+        } finally {
+          searcherManager.release(searcher);
+        }
+      }
+      
+    });
+    
+  }
+  
+  private void doTest(String testName, TestCallbacks callbacks) throws 
Exception {
+
+    //Create some random words. We need to be careful
+    //to make sure we get NUM_WORDS distinct words here
+    Set<String> wordSet = new HashSet<String>();
+    Random rand = new Random();
+    while(wordSet.size() < NUM_WORDS) {
+      int length = rand.nextInt(12) + 3;
+      char[] text = new char[length];
+      for(int i = 0; i < length; i++) {
+        text[i] = (char) (rand.nextInt(26) + 97);
+      }
+      wordSet.add(new String(text));
+    }
+    List<String> words = new ArrayList<String>(wordSet.size());
+    words.addAll(wordSet);
+    
+    
+    
+    //warm up
+    writeRandomWords(callbacks, words, rand, NUM_ENTRIES / 10, NUM_QUERIES / 
10, COMMIT_INTERVAL[0]);
+    
+    //Do the actual test
+    
+    for(int i = 0; i < COMMIT_INTERVAL.length; i++) {
+      Results results = writeRandomWords(callbacks, words, rand, NUM_ENTRIES, 
NUM_QUERIES / 10, COMMIT_INTERVAL[i]);
+    
+      System.out.println(testName + " writes(entries=" + NUM_ENTRIES + ", 
commit=" + COMMIT_INTERVAL[i] + "): " + 
TimeUnit.NANOSECONDS.toMillis(results.writeTime));
+      System.out.println(testName + " queries(entries=" + NUM_ENTRIES + ", 
commit=" + COMMIT_INTERVAL[i] + "): " + 
TimeUnit.NANOSECONDS.toMillis(results.queryTime));
+    }
+  }
+
+  private Results writeRandomWords(TestCallbacks callbacks, List<String> words,
+      Random rand, int numEntries, int numQueries, int commitInterval) throws 
Exception {
+    Results results  = new Results();
+    callbacks.init();
+    int[] counts = new int[words.size()];
+    long start = System.nanoTime();
+    try {
+      for(int i =0; i < numEntries; i++) {
+        int word1 = rand.nextInt(words.size());
+        int word2 = rand.nextInt(words.size());
+        counts[word1]++;
+        counts[word2]++;
+        String value = words.get(word1) + " " + words.get(word2);
+        callbacks.addObject("key" + i, value);
+
+        if(i % commitInterval == 0 && i != 0) {
+          callbacks.commit();
+        }
+      }
+      callbacks.commit();
+      callbacks.waitForAsync();
+      long end = System.nanoTime();
+      results.writeTime = end - start;
+      
+      
+      start = System.nanoTime();
+      for(int i=0; i < numQueries; i++) {
+        int wordIndex = rand.nextInt(words.size());
+        String word = words.get(wordIndex);
+        Query query = new TermQuery(new Term("text", word));
+        int size  = callbacks.query(query);
+//        int size  = callbacks.query(parser.parse(word));
+        //All of my tests sometimes seem to be missing a couple of words, 
including the stock lucene
+//        assertEquals("Error on query " + i + " word=" + word, 
counts[wordIndex], size);
+      }
+      end = System.nanoTime();
+      results.queryTime = end - start;
+      
+      return results;
+    } finally {
+      callbacks.cleanup();
+    }
+  }
+
+  private static class TestObject implements DataSerializable {
+    private String text;
+
+    public TestObject() {
+      
+    }
+    
+    public TestObject(String text) {
+      super();
+      this.text = text;
+    }
+
+    @Override
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeString(text, out);
+    }
+
+    @Override
+    public void fromData(DataInput in)
+        throws IOException, ClassNotFoundException {
+      text = DataSerializer.readString(in);
+    }
+
+    @Override
+    public String toString() {
+      return text;
+    }
+    
+    
+  }
+  
+  private interface TestCallbacks {
+    public void init() throws Exception;
+    public int query(Query query) throws Exception;
+    public void addObject(String key, String text)  throws Exception;
+    public void commit() throws Exception;
+    public void waitForAsync() throws Exception;
+    public void cleanup() throws Exception;
+  }
+  
+  private static class Results {
+    long writeTime;
+    long queryTime;
+  }
+}

Reply via email to