[GEODE-2324] keep private variables private.

* Update AcceptorImplDUnit test,
* Refactor AcceptorImpl.close into multiple methods.
* Remove a commented-out method.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/d73ec978
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/d73ec978
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/d73ec978

Branch: refs/heads/develop
Commit: d73ec978476ec4bf835c38d1713e14b48324515f
Parents: 6bed282
Author: Galen O'Sullivan <gosulli...@pivotal.io>
Authored: Wed Jan 25 11:30:07 2017 -0800
Committer: Bruce Schuchardt <bschucha...@pivotal.io>
Committed: Thu Feb 9 14:28:48 2017 -0800

----------------------------------------------------------------------
 .../cache/tier/sockets/AcceptorImpl.java        | 202 +++++++-----------
 .../sockets/command/AcceptorImplObserver.java   |  29 ---
 .../tier/sockets/AcceptorImplDUnitTest.java     | 205 +++++++++++++------
 .../tier/sockets/AcceptorImplJUnitTest.java     |  42 ++--
 4 files changed, 243 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/d73ec978/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 1bca1fd..5fa8096 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLException;
 
-import 
org.apache.geode.internal.cache.tier.sockets.command.AcceptorImplObserver;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -207,7 +206,7 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
   public final AtomicInteger clientServerCnxCount = new AtomicInteger();
 
   /** Has this acceptor been shut down */
-  private volatile boolean shutdown = false;
+  private volatile boolean shutdownStarted = false;
 
   /** The thread that runs the acceptor */
   private Thread thread = null;
@@ -271,33 +270,6 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
 
   private SecurityService securityService = 
IntegratedSecurityService.getSecurityService();
 
-  // Assumed non-null. Do not set this to null.
-  private static AcceptorImplObserver 
acceptorImplObserver_do_not_access_directly =
-      new AcceptorImplObserver() {
-        @Override
-        public void beforeClose(AcceptorImpl acceptorImpl) {}
-
-        @Override
-        public void normalCloseTermination(AcceptorImpl acceptorImpl) {}
-
-        @Override
-        public void afterClose(AcceptorImpl acceptorImpl) {}
-      };
-
-  private static AcceptorImplObserver getAcceptorImplObserver() {
-    synchronized (AcceptorImpl.class) {
-      return acceptorImplObserver_do_not_access_directly;
-    }
-  }
-
-  public static void setObserver_TESTONLY(AcceptorImplObserver observer) {
-    synchronized (AcceptorImpl.class) {
-      if (observer != null) {
-        acceptorImplObserver_do_not_access_directly = observer;
-      }
-    }
-  }
-
   /**
    * Initializes this acceptor thread to listen for connections on the given 
port.
    * 
@@ -1551,19 +1523,17 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
 
   @Override
   public boolean isRunning() {
-    return !this.shutdown;
+    return !this.shutdownStarted;
   }
 
   @Override
   public void close() {
-    AcceptorImplObserver acceptorImplObserver = getAcceptorImplObserver();
     try {
       synchronized (syncLock) {
-        acceptorImplObserver.beforeClose(this);
         if (!isRunning()) {
           return;
         }
-        this.shutdown = true;
+        this.shutdownStarted = true;
         logger.info(LocalizedMessage.create(
             
LocalizedStrings.AcceptorImpl_CACHE_SERVER_ON_PORT_0_IS_SHUTTING_DOWN, 
this.localPort));
         if (this.thread != null) {
@@ -1572,81 +1542,87 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
         try {
           this.serverSock.close();
         } catch (IOException ignore) {
-          // Well, we tried. Continue shutting down.
         }
+
         crHelper.setShutdown(true); // set this before shutting down the pool
-        if (isSelector()) {
-          this.hsTimer.cancel();
-          if (this.tmpSel != null) {
-            try {
-              this.tmpSel.close();
-            } catch (IOException ignore) {
-            }
-          }
-          try {
-            wakeupSelector();
-            this.selector.close();
-          } catch (IOException ignore) {
-          }
-          if (this.selectorThread != null) {
-            this.selectorThread.interrupt();
-          }
-          this.commBufferQueue.clear();
-        }
+        shutdownSelectorIfIsSelector();
         ClientHealthMonitor.shutdownInstance();
         shutdownSCs();
         this.clientNotifier.shutdown(this.acceptorId);
-        this.pool.shutdown();
-
-        try {
-          if (!this.pool.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS)) {
-            logger.warn(LocalizedMessage.create(
-                
LocalizedStrings.PoolImpl_TIMEOUT_WAITING_FOR_BACKGROUND_TASKS_TO_COMPLETE));
-            this.pool.shutdownNow();
-          }
-        } catch (InterruptedException ignore) {
-          Thread.currentThread().interrupt();
-          this.pool.shutdownNow();
-        }
-        this.hsPool.shutdownNow();
+        shutdownPools();
         this.stats.close();
-        GemFireCacheImpl myCache = (GemFireCacheImpl) cache;
-        if (!myCache.forcedDisconnect()) {
-          Set<PartitionedRegion> prs = myCache.getPartitionedRegions();
-          for (PartitionedRegion pr : prs) {
-            Map<Integer, BucketAdvisor.BucketProfile> profiles =
-                new HashMap<Integer, BucketAdvisor.BucketProfile>();
-            // get all local real bucket advisors
-            Map<Integer, BucketAdvisor> advisors = 
pr.getRegionAdvisor().getAllBucketAdvisors();
-            for (Map.Entry<Integer, BucketAdvisor> entry : 
advisors.entrySet()) {
-              BucketAdvisor advisor = entry.getValue();
-              BucketProfile bp = (BucketProfile) advisor.createProfile();
-              advisor.updateServerBucketProfile(bp);
-              profiles.put(entry.getKey(), bp);
-            }
-            Set receipients = new HashSet();
-            receipients = pr.getRegionAdvisor().adviseAllPRNodes();
-            // send it to all in one messgae
-            ReplyProcessor21 reply = 
AllBucketProfilesUpdateMessage.send(receipients,
-                pr.getDistributionManager(), pr.getPRId(), profiles, true);
-            if (reply != null) {
-              reply.waitForRepliesUninterruptibly();
-            }
-
-            if (logger.isDebugEnabled()) {
-              logger.debug("sending messages to all peers for removing this 
server..");
-            }
-          }
-        }
-        acceptorImplObserver.normalCloseTermination(this);
+        notifyCacheMembersOfClose();
       } // synchronized
     } catch (RuntimeException e) {/* ignore and log */
       
logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED), 
e);
-    } finally {
-      acceptorImplObserver.afterClose(this);
     }
   }
 
+  private void notifyCacheMembersOfClose() {
+    GemFireCacheImpl myCache = (GemFireCacheImpl) cache;
+    if (!myCache.forcedDisconnect()) {
+      for (PartitionedRegion pr : myCache.getPartitionedRegions()) {
+        Map<Integer, BucketAdvisor.BucketProfile> profiles = new HashMap<>();
+        // get all local real bucket advisors
+        Map<Integer, BucketAdvisor> advisors = 
pr.getRegionAdvisor().getAllBucketAdvisors();
+        for (Map.Entry<Integer, BucketAdvisor> entry : advisors.entrySet()) {
+          BucketAdvisor advisor = entry.getValue();
+          BucketProfile bp = (BucketProfile) advisor.createProfile();
+          advisor.updateServerBucketProfile(bp);
+          profiles.put(entry.getKey(), bp);
+        }
+
+        Set recipients = pr.getRegionAdvisor().adviseAllPRNodes();
+        // send it to all in one message
+        ReplyProcessor21 reply = 
AllBucketProfilesUpdateMessage.send(recipients,
+            pr.getDistributionManager(), pr.getPRId(), profiles, true);
+        if (reply != null) {
+          reply.waitForRepliesUninterruptibly();
+        }
+
+        if (logger.isDebugEnabled()) {
+          logger.debug("sending messages to all peers for removing this 
server..");
+        }
+      }
+    }
+  }
+
+  private void shutdownSelectorIfIsSelector() {
+    if (isSelector()) {
+      this.hsTimer.cancel();
+      if (this.tmpSel != null) {
+        try {
+          this.tmpSel.close();
+        } catch (IOException ignore) {
+        }
+      }
+      try {
+        wakeupSelector();
+        this.selector.close();
+      } catch (IOException ignore) {
+      }
+      if (this.selectorThread != null) {
+        this.selectorThread.interrupt();
+      }
+      this.commBufferQueue.clear();
+    }
+  }
+
+  private void shutdownPools() {
+    this.pool.shutdown();
+    try {
+      if (!this.pool.awaitTermination(PoolImpl.SHUTDOWN_TIMEOUT, 
TimeUnit.MILLISECONDS)) {
+        logger.warn(LocalizedMessage
+            
.create(LocalizedStrings.PoolImpl_TIMEOUT_WAITING_FOR_BACKGROUND_TASKS_TO_COMPLETE));
+        this.pool.shutdownNow();
+      }
+    } catch (InterruptedException ignore) {
+      Thread.currentThread().interrupt();
+      this.pool.shutdownNow();
+    }
+    this.hsPool.shutdownNow();
+  }
+
   private void shutdownSCs() {
     // added to fix part 2 of bug 37351.
     synchronized (this.allSCsLock) {
@@ -1657,36 +1633,12 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
     }
   }
 
+  public boolean isShutdownProperly() {
+    return !isRunning() && (selectorThread == null || 
!selectorThread.isAlive())
+        && (pool == null || pool.isShutdown()) && (hsPool == null || 
hsPool.isShutdown())
+        && (selector == null || !selector.isOpen());
+  }
 
-  // protected InetAddress getBindAddress() {
-  // return this.bindAddress;
-  // }
-
-  // /**
-  // * Calculates the bind address based on gemfire.properties.
-  // * Returns null if no bind address is configured.
-  // * @since GemFire 5.7
-  // */
-  // public static InetAddress calcBindAddress(Cache cache) throws IOException 
{
-  // InternalDistributedSystem system = (InternalDistributedSystem)cache
-  // .getDistributedSystem();
-  // DistributionConfig config = system.getConfig();
-  // InetAddress address = null;
-
-  // // Get the server-bind-address. If it is not null, use it.
-  // // If it is null, get the bind-address. If it is not null, use it.
-  // // Otherwise set default.
-  // String serverBindAddress = config.getServerBindAddress();
-  // if (serverBindAddress != null && serverBindAddress.length() > 0) {
-  // address = InetAddress.getByName(serverBindAddress);
-  // } else {
-  // String bindAddress = config.getBindAddress();
-  // if (bindAddress != null && bindAddress.length() > 0) {
-  // address = InetAddress.getByName(bindAddress);
-  // }
-  // }
-  // return address;
-  // }
   /**
    * @param bindName the ip address or host name that this acceptor should 
bind to. If null or ""
    *        then calculate it.

http://git-wip-us.apache.org/repos/asf/geode/blob/d73ec978/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java
deleted file mode 100644
index 3d02878..0000000
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-
-package org.apache.geode.internal.cache.tier.sockets.command;
-
-import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
-
-/**
- * AcceptorImplObserver is an observer/visitor for AcceptorImpl that is used 
for testing.
- */
-public interface AcceptorImplObserver {
-  void beforeClose(AcceptorImpl acceptorImpl);
-
-  void normalCloseTermination(AcceptorImpl acceptorImpl);
-
-  void afterClose(AcceptorImpl acceptorImpl);
-}

http://git-wip-us.apache.org/repos/asf/geode/blob/d73ec978/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java
index ca8592f..810aabc 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java
@@ -15,9 +15,10 @@
 
 package org.apache.geode.internal.cache.tier.sockets;
 
-import org.apache.geode.cache.Cache;
+import com.jayway.awaitility.Awaitility;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.GemFireCache;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionFactory;
 import org.apache.geode.cache.RegionShortcut;
@@ -27,22 +28,21 @@ import org.apache.geode.cache.client.ClientRegionShortcut;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.cache.util.CacheWriterAdapter;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.CacheServerImpl;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
-import 
org.apache.geode.internal.cache.tier.sockets.command.AcceptorImplObserver;
 import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.ThreadUtils;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.lang.management.ManagementFactory;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.util.Arrays;
+import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 
 import static org.junit.Assert.*;
@@ -52,44 +52,79 @@ import static org.junit.Assert.*;
  */
 @Category(DistributedTest.class)
 public class AcceptorImplDUnitTest extends JUnit4DistributedTestCase {
-  private static Cache cache;
 
   public AcceptorImplDUnitTest() {
     super();
   }
 
-  @Override
-  public void postTearDown() throws Exception {
-    if (cache != null) {
-      cache.close();
-      cache = null;
+  // SleepyCacheWriter will block indefinitely.
+  // Anyone who has a handle on the SleepyCacheWriter can interrupt it by 
calling wakeUp.
+  class SleepyCacheWriter<K, V> extends CacheWriterAdapter<K, V> {
+    private boolean setOnStart;
+    private boolean setOnInterrupt;
+    private boolean stopWaiting;
+    // locks the above three booleans.
+    private final Object lock = new Object();
+
+    public void notifyStart() {
+      synchronized (lock) {
+        setOnStart = true;
+      }
+    }
+
+    public boolean isStarted() {
+      synchronized (lock) {
+        return setOnStart;
+      }
+    }
+
+    public void notifyInterrupt() {
+      synchronized (lock) {
+        setOnInterrupt = true;
+      }
+    }
+
+    public boolean isInterrupted() {
+      synchronized (lock) {
+        return setOnInterrupt;
+      }
+    }
+
+    public void stopWaiting() {
+      synchronized (lock) {
+        this.stopWaiting = true;
+        lock.notify();
+      }
+    }
+
+    public boolean isReadyToQuit() {
+      synchronized (lock) {
+        return stopWaiting;
+      }
     }
-    super.postTearDown();
-  }
 
-  public static class SleepyCacheWriter<K, V> extends CacheWriterAdapter<K, V> 
{
+    SleepyCacheWriter() {}
+
     @Override
     public void beforeCreate(EntryEvent<K, V> event) {
-      while (true) {
-        System.out.println("Sleeping a long time.");
+      System.out.println("Sleeping a long time.");
+      notifyStart();
+      while (!isReadyToQuit()) {
         try {
-          Thread.sleep(100000000);
-        } catch (InterruptedException ignore) {
+          synchronized (lock) {
+            lock.wait();
+          }
+        } catch (InterruptedException ex) {
+          notifyInterrupt();
         }
       }
+      if (isInterrupted()) {
+        Thread.currentThread().interrupt();
+      }
     }
   }
 
   /**
-   * Dump threads to standard out. For debugging.
-   */
-  private void dumpThreads() {
-    ThreadMXBean bean = ManagementFactory.getThreadMXBean();
-    ThreadInfo[] infos = bean.dumpAllThreads(true, true);
-    System.out.println("infos = " + Arrays.toString(infos));
-  }
-
-  /**
    * GEODE-2324. There was a bug where, due to an uncaught exception, 
`AcceptorImpl.close()` was
    * short-circuiting and failing to clean up properly.
    *
@@ -100,40 +135,19 @@ public class AcceptorImplDUnitTest extends 
JUnit4DistributedTestCase {
    * since the fields are private) and implementation-dependent.
    */
   @Test
-  public void testShutdownCatchesException() throws Exception {
+  public void testAcceptorImplCloseCleansUpWithHangingConnection() throws 
Exception {
     final String hostname = Host.getHost(0).getHostName();
     final VM clientVM = Host.getHost(0).getVM(0);
 
-    // AtomicBooleans can be set from wherever they are, including an 
anonymous class or other
-    // thread.
-    AtomicBoolean terminatedNormally = new AtomicBoolean(false);
-    AtomicBoolean passedPostConditions = new AtomicBoolean(false);
-
     Properties props = new Properties();
     props.setProperty(MCAST_PORT, "0");
 
-    AcceptorImpl.setObserver_TESTONLY(new AcceptorImplObserver() {
-      @Override
-      public void beforeClose(AcceptorImpl acceptorImpl) {
-        Thread.currentThread().interrupt();
-      }
-
-      @Override
-      public void normalCloseTermination(AcceptorImpl acceptorImpl) {
-        terminatedNormally.set(true);
-      }
-
-      @Override
-      public void afterClose(AcceptorImpl acceptorImpl) {
-        passedPostConditions.set(!acceptorImpl.isRunning());
-      }
-    });
-
     try (InternalCache cache = (InternalCache) new 
CacheFactory(props).create()) {
       RegionFactory<Object, Object> regionFactory =
           cache.createRegionFactory(RegionShortcut.PARTITION);
 
-      regionFactory.setCacheWriter(new SleepyCacheWriter<>());
+      SleepyCacheWriter<Object, Object> sleepyCacheWriter = new 
SleepyCacheWriter<>();
+      regionFactory.setCacheWriter(sleepyCacheWriter);
 
       final CacheServer server = cache.addCacheServer();
       final int port = AvailablePortHelper.getRandomAvailableTCPPort();
@@ -142,32 +156,93 @@ public class AcceptorImplDUnitTest extends 
JUnit4DistributedTestCase {
 
       regionFactory.create("region1");
 
+      assertTrue(cache.isServer());
+      assertFalse(cache.isClosed());
+
+      Awaitility.await("Acceptor is up and running").atMost(10, SECONDS)
+          .until(() -> getAcceptorImplFromCache(cache) != null);
+      AcceptorImpl acceptorImpl = getAcceptorImplFromCache(cache);
+
+
       clientVM.invokeAsync(() -> {
+        // System.setProperty("gemfire.PoolImpl.TRY_SERVERS_ONCE", "true");
         ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
         clientCacheFactory.addPoolServer(hostname, port);
+        clientCacheFactory.setPoolReadTimeout(5000);
+        clientCacheFactory.setPoolRetryAttempts(1);
+        clientCacheFactory.setPoolMaxConnections(1);
+        clientCacheFactory.setPoolFreeConnectionTimeout(1000);
         ClientCache clientCache = clientCacheFactory.create();
         Region<Object, Object> clientRegion1 =
             
clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("region1");
         clientRegion1.put("foo", "bar");
       });
 
+      Awaitility.await("Cache writer starts").atMost(10, SECONDS)
+          .until(sleepyCacheWriter::isStarted);
+
       cache.close();
 
-      dumpThreads();
-      assertTrue(terminatedNormally.get());
-      assertTrue(passedPostConditions.get());
+      Awaitility.await("Cache writer interrupted").atMost(10, SECONDS)
+          .until(sleepyCacheWriter::isInterrupted);
 
-      // cleanup.
-      AcceptorImpl.setObserver_TESTONLY(new AcceptorImplObserver() {
-        @Override
-        public void beforeClose(AcceptorImpl acceptorImpl) {}
+      sleepyCacheWriter.stopWaiting();
 
-        @Override
-        public void normalCloseTermination(AcceptorImpl acceptorImpl) {}
+      Awaitility.await("Acceptor shuts down properly").atMost(10, SECONDS)
+          .until(() -> acceptorImpl.isShutdownProperly());
 
-        @Override
-        public void afterClose(AcceptorImpl acceptorImpl) {}
-      });
+      ThreadUtils.dumpMyThreads(); // for debugging.
+
+      regionFactory.setCacheWriter(null);
+    }
+  }
+
+
+  @Test
+  public void testAcceptorImplCloseCleansUp() throws Exception {
+    Properties props = new Properties();
+    props.setProperty(MCAST_PORT, "0");
+
+    try (InternalCache cache = (InternalCache) new 
CacheFactory(props).create()) {
+      RegionFactory<Object, Object> regionFactory =
+          cache.createRegionFactory(RegionShortcut.PARTITION);
+
+      final CacheServer server = cache.addCacheServer();
+      final int port = AvailablePortHelper.getRandomAvailableTCPPort();
+      server.setPort(port);
+      server.start();
+
+      regionFactory.create("region1");
+
+      assertTrue(cache.isServer());
+      assertFalse(cache.isClosed());
+      Awaitility.await("Acceptor is up and running").atMost(10, SECONDS)
+          .until(() -> getAcceptorImplFromCache(cache) != null);
+
+      AcceptorImpl acceptorImpl = getAcceptorImplFromCache(cache);
+
+      cache.close();
+      Awaitility.await("Acceptor shuts down properly").atMost(10, SECONDS)
+          .until(acceptorImpl::isShutdownProperly);
+
+      assertTrue(cache.isClosed());
+      assertFalse(acceptorImpl.isRunning());
+    }
+  }
+
+  /**
+   *
+   * @param cache
+   * @return the cache's Acceptor, if there is exactly one CacheServer. 
Otherwise null.
+   */
+  public AcceptorImpl getAcceptorImplFromCache(GemFireCache cache) {
+    GemFireCacheImpl gemFireCache = (GemFireCacheImpl) cache;
+    List<CacheServer> cacheServers = gemFireCache.getCacheServers();
+    if (cacheServers.size() != 1) {
+      return null;
     }
+
+    CacheServerImpl cacheServerImpl = (CacheServerImpl) cacheServers.get(0);
+    return cacheServerImpl.getAcceptor();
   }
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/d73ec978/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
index 58c2157..7aa11b7 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
@@ -15,13 +15,9 @@
 package org.apache.geode.internal.cache.tier.sockets;
 
 import org.apache.geode.cache.AttributesFactory;
-import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionFactory;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ServerRefusedConnectionException;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.DistributedSystem;
@@ -46,16 +42,30 @@ import java.net.BindException;
 import java.net.Socket;
 import java.util.Collections;
 import java.util.Properties;
-import java.util.Set;
 
-import static 
org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
-import static 
org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID.system;
 import static org.junit.Assert.*;
 
 @Category({IntegrationTest.class, ClientServerTest.class})
 public class AcceptorImplJUnitTest {
 
+  DistributedSystem system;
+  InternalCache cache;
+
+  @Before
+  public void setUp() throws Exception {
+    Properties p = new Properties();
+    p.setProperty(MCAST_PORT, "0");
+    this.system = DistributedSystem.connect(p);
+    this.cache = (InternalCache) CacheFactory.create(system);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    this.cache.close();
+    this.system.disconnect();
+  }
+
   /*
    * Test method for 
'org.apache.geode.internal.cache.tier.sockets.AcceptorImpl(int, int, boolean,
    * int, Cache)'
@@ -64,16 +74,15 @@ public class AcceptorImplJUnitTest {
   @Test
   public void testConstructor() throws CacheException, IOException {
     AcceptorImpl a1 = null, a2 = null, a3 = null;
-    Properties props = new Properties();
-    props.setProperty(MCAST_PORT, "0");
-    try (InternalCache cache = (InternalCache) new 
CacheFactory(props).create()) {
+    try {
       final int[] freeTCPPorts = 
AvailablePortHelper.getRandomAvailableTCPPorts(2);
       int port1 = freeTCPPorts[0];
       int port2 = freeTCPPorts[1];
 
+
       try {
         new AcceptorImpl(port1, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache,
+            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, 
CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
             null, null, false, Collections.EMPTY_LIST, 
CacheServer.DEFAULT_TCP_NO_DELAY);
@@ -83,7 +92,7 @@ public class AcceptorImplJUnitTest {
 
       try {
         new AcceptorImpl(port2, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, 0,
+            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0,
             CacheServer.DEFAULT_MAX_THREADS, 
CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
             CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, 
Collections.EMPTY_LIST,
             CacheServer.DEFAULT_TCP_NO_DELAY);
@@ -93,12 +102,12 @@ public class AcceptorImplJUnitTest {
 
       try {
         a1 = new AcceptorImpl(port1, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache,
+            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, 
CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
             null, null, false, Collections.EMPTY_LIST, 
CacheServer.DEFAULT_TCP_NO_DELAY);
         a2 = new AcceptorImpl(port1, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache,
+            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, 
CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
             null, null, false, Collections.EMPTY_LIST, 
CacheServer.DEFAULT_TCP_NO_DELAY);
@@ -107,12 +116,13 @@ public class AcceptorImplJUnitTest {
       }
 
       a3 = new AcceptorImpl(port2, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-          CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache,
+          CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
           AcceptorImpl.MINIMUM_MAX_CONNECTIONS, 
CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
           null, false, Collections.EMPTY_LIST, 
CacheServer.DEFAULT_TCP_NO_DELAY);
       assertEquals(port2, a3.getPort());
-      InternalDistributedSystem isystem = (InternalDistributedSystem) 
cache.getDistributedSystem();
+      InternalDistributedSystem isystem =
+          (InternalDistributedSystem) this.cache.getDistributedSystem();
       DistributionConfig config = isystem.getConfig();
       String bindAddress = config.getBindAddress();
       if (bindAddress == null || bindAddress.length() <= 0) {

Reply via email to