[GEODE-2324] Dunit test for AcceptorImpl.close()

* Add AcceptorImplObserver
* Add AccceptorImplDunitTest


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/1d561dcc
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/1d561dcc
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/1d561dcc

Branch: refs/heads/develop
Commit: 1d561dccaf8736972c6d355244b0fe54b9bd7a8b
Parents: cae580f
Author: Galen O'Sullivan <gosulli...@pivotal.io>
Authored: Mon Jan 23 21:49:35 2017 -0800
Committer: Bruce Schuchardt <bschucha...@pivotal.io>
Committed: Thu Feb 9 14:28:48 2017 -0800

----------------------------------------------------------------------
 .../cache/tier/sockets/AcceptorImpl.java        |  12 ++
 .../sockets/command/AcceptorImplObserver.java   |  47 ++++++
 .../tier/sockets/AcceptorImplDUnitTest.java     | 164 +++++++++++++++++++
 .../tier/sockets/AcceptorImplJUnitTest.java     |  42 ++---
 4 files changed, 239 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/1d561dcc/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
index 37438b6..d72c72b 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java
@@ -57,6 +57,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.ssl.SSLException;
 
+import 
org.apache.geode.internal.cache.tier.sockets.command.AcceptorImplObserver;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CancelException;
@@ -1528,8 +1529,12 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
 
   @Override
   public void close() {
+    AcceptorImplObserver acceptorImplObserver = 
AcceptorImplObserver.getInstance();
     try {
       synchronized (syncLock) {
+        if (acceptorImplObserver != null) {
+          acceptorImplObserver.beforeClose(this);
+        }
         if (!isRunning()) {
           return;
         }
@@ -1608,9 +1613,16 @@ public class AcceptorImpl extends Acceptor implements 
Runnable {
             }
           }
         }
+        if (acceptorImplObserver != null) {
+          acceptorImplObserver.normalCloseTermination(this);
+        }
       } // synchronized
     } catch (RuntimeException e) {/* ignore and log */
       
logger.warn(LocalizedMessage.create(LocalizedStrings.AcceptorImpl_UNEXPECTED), 
e);
+    } finally {
+      if (acceptorImplObserver != null) {
+        acceptorImplObserver.afterClose(this);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/1d561dcc/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java
new file mode 100644
index 0000000..f5ee982
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/AcceptorImplObserver.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets.command;
+
+import org.apache.geode.internal.cache.tier.sockets.AcceptorImpl;
+
+/**
+ * AcceptorImplObserver is an observer/visitor for AcceptorImpl that is used 
for testing.
+ */
+public abstract class AcceptorImplObserver {
+  private static AcceptorImplObserver instance;
+
+  /**
+   * Set the instance of the observer. Setting to null will clear the observer.
+   *
+   * @param instance
+   * @return the old observer, or null if there was no old observer.
+   */
+  public static final AcceptorImplObserver setInstance(AcceptorImplObserver 
instance) {
+    AcceptorImplObserver oldInstance = AcceptorImplObserver.instance;
+    AcceptorImplObserver.instance = instance;
+    return oldInstance;
+  }
+
+  public static final AcceptorImplObserver getInstance() {
+    return instance;
+  }
+
+  public void beforeClose(AcceptorImpl acceptorImpl) {}
+
+  public void normalCloseTermination(AcceptorImpl acceptorImpl) {}
+
+  public void afterClose(AcceptorImpl acceptorImpl) {}
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/1d561dcc/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java
new file mode 100644
index 0000000..8b4c672
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplDUnitTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.internal.cache.tier.sockets;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.EntryEvent;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.util.CacheWriterAdapter;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.InternalCache;
+import 
org.apache.geode.internal.cache.tier.sockets.command.AcceptorImplObserver;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
+import org.apache.geode.test.junit.categories.DistributedTest;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.util.Arrays;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests for AcceptorImpl.
+ */
+@Category(DistributedTest.class)
+public class AcceptorImplDUnitTest extends JUnit4DistributedTestCase {
+  private static Cache cache;
+
+  public AcceptorImplDUnitTest() {
+    super();
+  }
+
+  @Override
+  public void postTearDown() throws Exception {
+    if (cache != null) {
+      cache.close();
+      cache = null;
+    }
+    super.postTearDown();
+  }
+
+  public static class SleepyCacheWriter<K, V> extends CacheWriterAdapter<K, V> 
{
+    @Override
+    public void beforeCreate(EntryEvent<K, V> event) {
+      while (true) {
+        System.out.println("Sleeping a long time.");
+        try {
+          Thread.sleep(100000000);
+        } catch (InterruptedException ignore) {
+        }
+      }
+    }
+  }
+
+  /**
+   * Dump threads to standard out. For debugging.
+   */
+  private void dumpThreads() {
+    ThreadMXBean bean = ManagementFactory.getThreadMXBean();
+    ThreadInfo[] infos = bean.dumpAllThreads(true, true);
+    System.out.println("infos = " + Arrays.toString(infos));
+  }
+
+  /**
+   * GEODE-2324. There was a bug where, due to an uncaught exception, 
`AcceptorImpl.close()` was
+   * short-circuiting and failing to clean up properly.
+   *
+   * What this test does is start a Cache and hook the Acceptor to interrupt 
the thread before the
+   * place where an InterruptedException could be thrown. It interrupts the 
thread, and checks that
+   * the thread has terminated normally without short-circuiting. It doesn't 
check that every part
+   * of the AcceptorImpl has shut down properly -- that seems both difficult 
to check (especially
+   * since the fields are private) and implementation-dependent.
+   */
+  @Test
+  public void testShutdownCatchesException() throws Exception {
+    final String hostname = Host.getHost(0).getHostName();
+    final VM clientVM = Host.getHost(0).getVM(0);
+
+    // AtomicBooleans can be set from wherever they are, including an 
anonymous class or other
+    // thread.
+    AtomicBoolean terminatedNormally = new AtomicBoolean(false);
+    AtomicBoolean passedPostConditions = new AtomicBoolean(false);
+
+    Properties props = new Properties();
+    props.setProperty(MCAST_PORT, "0");
+
+    AcceptorImplObserver.setInstance(new AcceptorImplObserver() {
+      @Override
+      public void beforeClose(AcceptorImpl acceptorImpl) {
+        Thread.currentThread().interrupt();
+      }
+
+      @Override
+      public void normalCloseTermination(AcceptorImpl acceptorImpl) {
+        terminatedNormally.set(true);
+      }
+
+      @Override
+      public void afterClose(AcceptorImpl acceptorImpl) {
+        passedPostConditions.set(!acceptorImpl.isRunning());
+      }
+    });
+
+    try (InternalCache cache = (InternalCache) new 
CacheFactory(props).create()) {
+      RegionFactory<Object, Object> regionFactory =
+          cache.createRegionFactory(RegionShortcut.PARTITION);
+
+      regionFactory.setCacheWriter(new SleepyCacheWriter<>());
+
+      final CacheServer server = cache.addCacheServer();
+      final int port = AvailablePortHelper.getRandomAvailableTCPPort();
+      server.setPort(port);
+      server.start();
+
+      regionFactory.create("region1");
+
+      clientVM.invokeAsync(() -> {
+        ClientCacheFactory clientCacheFactory = new ClientCacheFactory();
+        clientCacheFactory.addPoolServer(hostname, port);
+        ClientCache clientCache = clientCacheFactory.create();
+        Region<Object, Object> clientRegion1 =
+            
clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create("region1");
+        clientRegion1.put("foo", "bar");
+      });
+
+      cache.close();
+
+      dumpThreads();
+      assertTrue(terminatedNormally.get());
+      assertTrue(passedPostConditions.get());
+
+      // cleanup.
+      AcceptorImplObserver.setInstance(null);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/geode/blob/1d561dcc/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
index 7aa11b7..58c2157 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImplJUnitTest.java
@@ -15,9 +15,13 @@
 package org.apache.geode.internal.cache.tier.sockets;
 
 import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheException;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.client.ClientCache;
+import org.apache.geode.cache.client.ClientCacheFactory;
 import org.apache.geode.cache.client.ServerRefusedConnectionException;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.DistributedSystem;
@@ -42,30 +46,16 @@ import java.net.BindException;
 import java.net.Socket;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.Set;
 
+import static 
org.apache.geode.distributed.ConfigurationProperties.CACHE_XML_FILE;
 import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
+import static 
org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID.system;
 import static org.junit.Assert.*;
 
 @Category({IntegrationTest.class, ClientServerTest.class})
 public class AcceptorImplJUnitTest {
 
-  DistributedSystem system;
-  InternalCache cache;
-
-  @Before
-  public void setUp() throws Exception {
-    Properties p = new Properties();
-    p.setProperty(MCAST_PORT, "0");
-    this.system = DistributedSystem.connect(p);
-    this.cache = (InternalCache) CacheFactory.create(system);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    this.cache.close();
-    this.system.disconnect();
-  }
-
   /*
    * Test method for 
'org.apache.geode.internal.cache.tier.sockets.AcceptorImpl(int, int, boolean,
    * int, Cache)'
@@ -74,15 +64,16 @@ public class AcceptorImplJUnitTest {
   @Test
   public void testConstructor() throws CacheException, IOException {
     AcceptorImpl a1 = null, a2 = null, a3 = null;
-    try {
+    Properties props = new Properties();
+    props.setProperty(MCAST_PORT, "0");
+    try (InternalCache cache = (InternalCache) new 
CacheFactory(props).create()) {
       final int[] freeTCPPorts = 
AvailablePortHelper.getRandomAvailableTCPPorts(2);
       int port1 = freeTCPPorts[0];
       int port2 = freeTCPPorts[1];
 
-
       try {
         new AcceptorImpl(port1, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
+            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS - 1, 
CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
             null, null, false, Collections.EMPTY_LIST, 
CacheServer.DEFAULT_TCP_NO_DELAY);
@@ -92,7 +83,7 @@ public class AcceptorImplJUnitTest {
 
       try {
         new AcceptorImpl(port2, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache, 0,
+            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache, 0,
             CacheServer.DEFAULT_MAX_THREADS, 
CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT,
             CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null, null, false, 
Collections.EMPTY_LIST,
             CacheServer.DEFAULT_TCP_NO_DELAY);
@@ -102,12 +93,12 @@ public class AcceptorImplJUnitTest {
 
       try {
         a1 = new AcceptorImpl(port1, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
+            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, 
CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
             null, null, false, Collections.EMPTY_LIST, 
CacheServer.DEFAULT_TCP_NO_DELAY);
         a2 = new AcceptorImpl(port1, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
+            CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache,
             AcceptorImpl.MINIMUM_MAX_CONNECTIONS, 
CacheServer.DEFAULT_MAX_THREADS,
             CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE,
             null, null, false, Collections.EMPTY_LIST, 
CacheServer.DEFAULT_TCP_NO_DELAY);
@@ -116,13 +107,12 @@ public class AcceptorImplJUnitTest {
       }
 
       a3 = new AcceptorImpl(port2, null, false, 
CacheServer.DEFAULT_SOCKET_BUFFER_SIZE,
-          CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, this.cache,
+          CacheServer.DEFAULT_MAXIMUM_TIME_BETWEEN_PINGS, cache,
           AcceptorImpl.MINIMUM_MAX_CONNECTIONS, 
CacheServer.DEFAULT_MAX_THREADS,
           CacheServer.DEFAULT_MAXIMUM_MESSAGE_COUNT, 
CacheServer.DEFAULT_MESSAGE_TIME_TO_LIVE, null,
           null, false, Collections.EMPTY_LIST, 
CacheServer.DEFAULT_TCP_NO_DELAY);
       assertEquals(port2, a3.getPort());
-      InternalDistributedSystem isystem =
-          (InternalDistributedSystem) this.cache.getDistributedSystem();
+      InternalDistributedSystem isystem = (InternalDistributedSystem) 
cache.getDistributedSystem();
       DistributionConfig config = isystem.getConfig();
       String bindAddress = config.getBindAddress();
       if (bindAddress == null || bindAddress.length() <= 0) {

Reply via email to