GEODE-1915: Prevent deadlock registering instantiators with gateways

Don't hold a lock while distributing instantiators. This prevents the
deadlock because incoming registrations won't wait for registrations
that are being distributed.

This change might cause instantiators to be distributed in a different
order that they were registered in, but that's ok because the order in
which different instantiators are registered is not important.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4f2e2774
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4f2e2774
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4f2e2774

Branch: refs/heads/feature/e2e-testing
Commit: 4f2e27749b0f9e94d0bfe7399fddbe1e9041ecdb
Parents: 331fc17
Author: Dan Smith <upthewatersp...@apache.org>
Authored: Tue Sep 20 18:00:34 2016 -0700
Committer: Dan Smith <upthewatersp...@apache.org>
Committed: Thu Sep 22 12:13:40 2016 -0700

----------------------------------------------------------------------
 .../geode/internal/InternalInstantiator.java    |  71 +++++++------
 .../geode/internal/cache/wan/WANTestBase.java   |  28 ++---
 .../SerialGatewaySenderOperationsDUnitTest.java | 106 ++++++++++++++++++-
 3 files changed, 160 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f2e2774/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java 
b/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java
index bb15097..b985885 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java
@@ -88,7 +88,7 @@ public class InternalInstantiator {
    * Registers an <code>Instantiator</code> with the data
    * serialization framework.
    */
-  public static synchronized void register(Instantiator instantiator,
+  public static void register(Instantiator instantiator,
                                            boolean distribute) {
     // [sumedh] Skip the checkForThread() check if the instantiation has not
     // to be distributed. This allows instantiations from ServerConnection
@@ -112,7 +112,7 @@ public class InternalInstantiator {
    * @throws IllegalStateException
    *         The instantiator cannot be registered
    */
-  private static synchronized void _register(Instantiator instantiator, 
boolean distribute)
+  private static void _register(Instantiator instantiator, boolean distribute)
   {
     if (instantiator == null) {
       throw new 
NullPointerException(LocalizedStrings.InternalInstantiator_CANNOT_REGISTER_A_NULL_INSTANTIATOR.toLocalizedString());
@@ -130,41 +130,50 @@ public class InternalInstantiator {
       }
     }
     final Integer idx = Integer.valueOf(classId);
-    
-    boolean retry;
-    do {
-      retry = false;
-      Object oldInst = idsToInstantiators.putIfAbsent(idx, instantiator);
-      if (oldInst != null) {
-        if (oldInst instanceof Marker) {
-          retry = !idsToInstantiators.replace(idx, oldInst, instantiator);
-          if (!retry) {
-            dsMap.put(cName, instantiator);
-            ((Marker) oldInst).setInstantiator(instantiator);
+
+    synchronized(InternalInstantiator.class) {
+      boolean retry;
+      do {
+        retry = false;
+        Object oldInst = idsToInstantiators.putIfAbsent(idx, instantiator);
+        if (oldInst != null) {
+          if (oldInst instanceof Marker) {
+            retry = !idsToInstantiators.replace(idx, oldInst, instantiator);
+            if (!retry) {
+              dsMap.put(cName, instantiator);
+              ((Marker) oldInst).setInstantiator(instantiator);
+            }
           }
-        } else {
-          Class oldClass =
-            ((Instantiator) oldInst).getInstantiatedClass();
-          if (!oldClass.getName().equals(cName)) {
-            throw new 
IllegalStateException(LocalizedStrings.InternalInstantiator_CLASS_ID_0_IS_ALREADY_REGISTERED_FOR_CLASS_1_SO_IT_COULD_NOT_BE_REGISTED_FOR_CLASS_2.toLocalizedString(new
 Object[] {Integer.valueOf(classId), oldClass.getName(), cName}));
-          } else {
-            return; // it was already registered
+          else {
+            Class oldClass =
+              ((Instantiator) oldInst).getInstantiatedClass();
+            if (!oldClass.getName().equals(cName)) {
+              throw new IllegalStateException(
+                
LocalizedStrings.InternalInstantiator_CLASS_ID_0_IS_ALREADY_REGISTERED_FOR_CLASS_1_SO_IT_COULD_NOT_BE_REGISTED_FOR_CLASS_2
+                  .toLocalizedString(new Object[] { Integer.valueOf(classId), 
oldClass.getName(), cName }));
+            }
+            else {
+              return; // it was already registered
+            }
           }
         }
-      } else {
-        dsMap.put(cName, instantiator);
+        else {
+          dsMap.put(cName, instantiator);
+        }
+      }
+      while (retry);
+
+      // if instantiator is getting registered for first time
+      // its EventID will be null, so generate a new event id
+      // the the distributed system is connected
+      GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+      if (cache != null && instantiator.getEventId() == null) {
+        instantiator.setEventId(new EventID(cache.getDistributedSystem()));
       }
-    } while (retry);
 
-    // if instantiator is getting registered for first time
-    // its EventID will be null, so generate a new event id
-    // the the distributed system is connected
-    GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
-    if (cache != null && instantiator.getEventId() == null) {
-      instantiator.setEventId(new EventID(cache.getDistributedSystem()));
+      
logger.info(LocalizedMessage.create(LocalizedStrings.InternalInstantiator_REGISTERED,
 new Object[] { Integer.valueOf(classId), c.getName()} ));
     }
-    
-    
logger.info(LocalizedMessage.create(LocalizedStrings.InternalInstantiator_REGISTERED,
 new Object[] { Integer.valueOf(classId), c.getName()} ));
+
     if (distribute) { // originated in this VM
       // send a message to other peers telling them about a newly-registered
       // instantiator, it also send event id of the originator along with the

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f2e2774/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
index 303b6d6..ff903d4 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java
@@ -2140,6 +2140,21 @@ public class WANTestBase extends 
JUnit4DistributedTestCase {
 
   public static void createClientWithLocator(int port0,String host,
       String regionName) {
+    createClientWithLocator(port0, host);
+
+    AttributesFactory factory = new AttributesFactory();
+    factory.setPoolName("pool");
+    factory.setDataPolicy(DataPolicy.NORMAL);
+    RegionAttributes attrs = factory.create();
+    region = cache.createRegion(regionName, attrs);
+    region.registerInterest("ALL_KEYS");
+    assertNotNull(region);
+    LogWriterUtils.getLogWriter().info(
+        "Distributed Region " + regionName + " created Successfully :"
+            + region.toString());
+  }
+
+  public static void createClientWithLocator(final int port0, final String 
host) {
     WANTestBase test = new WANTestBase();
     Properties props = test.getDistributedSystemProperties();
     props.setProperty(MCAST_PORT, "0");
@@ -2156,21 +2171,10 @@ public class WANTestBase extends 
JUnit4DistributedTestCase {
           .setPingInterval(250).setSubscriptionEnabled(true)
           .setSubscriptionRedundancy(-1).setReadTimeout(2000)
           .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10)
-          .setRetryAttempts(3).create(regionName);
+          .setRetryAttempts(3).create("pool");
     } finally {
       CacheServerTestUtil.enableShufflingOfEndpoints();
     }
-
-    AttributesFactory factory = new AttributesFactory();
-    factory.setPoolName(p.getName());
-    factory.setDataPolicy(DataPolicy.NORMAL);
-    RegionAttributes attrs = factory.create();
-    region = cache.createRegion(regionName, attrs);
-    region.registerInterest("ALL_KEYS");
-    assertNotNull(region);
-    LogWriterUtils.getLogWriter().info(
-        "Distributed Region " + regionName + " created Successfully :"
-            + region.toString());
   }
 
   public static int createReceiver_PDX(int locPort) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f2e2774/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
index f20bf56..0522e46 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java
@@ -16,6 +16,9 @@
  */
 package org.apache.geode.internal.cache.wan.serial;
 
+import org.apache.geode.DataSerializable;
+import org.apache.geode.DataSerializer;
+import org.apache.geode.Instantiator;
 import org.junit.experimental.categories.Category;
 import org.junit.Test;
 
@@ -25,6 +28,10 @@ import 
org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
 import org.apache.geode.test.junit.categories.DistributedTest;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -606,9 +613,45 @@ public class SerialGatewaySenderOperationsDUnitTest 
extends WANTestBase {
     vm1.invoke(check);
     
   }
-  
 
-  
+  @Test
+  public void registeringInstantiatorsInGatewayShouldNotCauseDeadlock() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+        "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+        "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(() -> createReceiverAndServer( nyPort ));
+    vm3.invoke(() -> createReceiverAndServer( lnPort ));
+
+    vm2.invoke(() -> createSender( "ln", 1, false, 100, 10, false, false, 
null, false));
+    vm3.invoke(() -> createSender( "ny", 2, false, 100, 10, false, false, 
null, false ));
+
+    vm4.invoke(() -> createClientWithLocator(nyPort, "localhost"));
+    
+    // Register instantiator
+    vm4.invoke(() -> Instantiator.register(new TestObjectInstantiator()));
+  }
+
+  @Test
+  public void registeringDataSerializableInGatewayShouldNotCauseDeadlock() {
+    Integer lnPort = (Integer)vm0.invoke(WANTestBase.class,
+      "createFirstLocatorWithDSId", new Object[] { 1 });
+    Integer nyPort = (Integer)vm1.invoke(WANTestBase.class,
+      "createFirstRemoteLocator", new Object[] { 2, lnPort });
+
+    vm2.invoke(() -> createReceiverAndServer( nyPort ));
+    vm3.invoke(() -> createReceiverAndServer( lnPort ));
+
+    vm2.invoke(() -> createSender( "ln", 1, false, 100, 10, false, false, 
null, false));
+    vm3.invoke(() -> createSender( "ny", 2, false, 100, 10, false, false, 
null, false ));
+
+    vm4.invoke(() -> createClientWithLocator(nyPort, "localhost"));
+
+    // Register instantiator
+    vm4.invoke(() -> DataSerializer.register(TestObjectDataSerializer.class));
+  }
+
   public static void verifySenderPausedState(String senderId) {
     Set<GatewaySender> senders = cache.getGatewaySenders();
     AbstractGatewaySender sender = null;
@@ -662,4 +705,63 @@ public class SerialGatewaySenderOperationsDUnitTest 
extends WANTestBase {
     sender.resume();
     sender.stop();
   }
+  
+  static class TestObjectInstantiator extends Instantiator {
+
+    TestObjectInstantiator(Class<TestObject> c, byte id) {
+      super(c, id);
+    }
+
+    TestObjectInstantiator() {
+      this(TestObject.class, (byte) 99);
+    }
+
+    public DataSerializable newInstance() {
+      return new TestObject();
+    }
+  }
+
+  static class TestObjectDataSerializer extends DataSerializer implements 
Serializable {
+
+    @Override
+    public Class<?>[] getSupportedClasses() {
+      return new Class<?>[] {TestObject.class};
+    }
+
+    @Override
+    public boolean toData(final Object o, final DataOutput out) throws 
IOException {
+      return o instanceof TestObject;
+    }
+
+    @Override
+    public Object fromData(final DataInput in) throws IOException, 
ClassNotFoundException {
+      return new TestObject();
+    }
+
+    @Override
+    public int getId() {
+      return 99;
+    }
+  }
+
+  static class TestObject implements DataSerializable {
+    
+    private static final long serialVersionUID = 1L;
+    
+    protected String id;
+
+    public TestObject() {}
+
+    public TestObject(String id) {
+      this.id = id;
+    }
+
+    public void toData(DataOutput out) throws IOException {
+      DataSerializer.writeString(this.id, out);
+    }
+
+    public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+      this.id = DataSerializer.readString(in);
+    }
+  }
 }

Reply via email to