GEODE-1915: Prevent deadlock registering instantiators with gateways Don't hold a lock while distributing instantiators. This prevents the deadlock because incoming registrations won't wait for registrations that are being distributed.
This change might cause instantiators to be distributed in a different order that they were registered in, but that's ok because the order in which different instantiators are registered is not important. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4f2e2774 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4f2e2774 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4f2e2774 Branch: refs/heads/feature/e2e-testing Commit: 4f2e27749b0f9e94d0bfe7399fddbe1e9041ecdb Parents: 331fc17 Author: Dan Smith <upthewatersp...@apache.org> Authored: Tue Sep 20 18:00:34 2016 -0700 Committer: Dan Smith <upthewatersp...@apache.org> Committed: Thu Sep 22 12:13:40 2016 -0700 ---------------------------------------------------------------------- .../geode/internal/InternalInstantiator.java | 71 +++++++------ .../geode/internal/cache/wan/WANTestBase.java | 28 ++--- .../SerialGatewaySenderOperationsDUnitTest.java | 106 ++++++++++++++++++- 3 files changed, 160 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f2e2774/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java b/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java index bb15097..b985885 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java +++ b/geode-core/src/main/java/org/apache/geode/internal/InternalInstantiator.java @@ -88,7 +88,7 @@ public class InternalInstantiator { * Registers an <code>Instantiator</code> with the data * serialization framework. */ - public static synchronized void register(Instantiator instantiator, + public static void register(Instantiator instantiator, boolean distribute) { // [sumedh] Skip the checkForThread() check if the instantiation has not // to be distributed. This allows instantiations from ServerConnection @@ -112,7 +112,7 @@ public class InternalInstantiator { * @throws IllegalStateException * The instantiator cannot be registered */ - private static synchronized void _register(Instantiator instantiator, boolean distribute) + private static void _register(Instantiator instantiator, boolean distribute) { if (instantiator == null) { throw new NullPointerException(LocalizedStrings.InternalInstantiator_CANNOT_REGISTER_A_NULL_INSTANTIATOR.toLocalizedString()); @@ -130,41 +130,50 @@ public class InternalInstantiator { } } final Integer idx = Integer.valueOf(classId); - - boolean retry; - do { - retry = false; - Object oldInst = idsToInstantiators.putIfAbsent(idx, instantiator); - if (oldInst != null) { - if (oldInst instanceof Marker) { - retry = !idsToInstantiators.replace(idx, oldInst, instantiator); - if (!retry) { - dsMap.put(cName, instantiator); - ((Marker) oldInst).setInstantiator(instantiator); + + synchronized(InternalInstantiator.class) { + boolean retry; + do { + retry = false; + Object oldInst = idsToInstantiators.putIfAbsent(idx, instantiator); + if (oldInst != null) { + if (oldInst instanceof Marker) { + retry = !idsToInstantiators.replace(idx, oldInst, instantiator); + if (!retry) { + dsMap.put(cName, instantiator); + ((Marker) oldInst).setInstantiator(instantiator); + } } - } else { - Class oldClass = - ((Instantiator) oldInst).getInstantiatedClass(); - if (!oldClass.getName().equals(cName)) { - throw new IllegalStateException(LocalizedStrings.InternalInstantiator_CLASS_ID_0_IS_ALREADY_REGISTERED_FOR_CLASS_1_SO_IT_COULD_NOT_BE_REGISTED_FOR_CLASS_2.toLocalizedString(new Object[] {Integer.valueOf(classId), oldClass.getName(), cName})); - } else { - return; // it was already registered + else { + Class oldClass = + ((Instantiator) oldInst).getInstantiatedClass(); + if (!oldClass.getName().equals(cName)) { + throw new IllegalStateException( + LocalizedStrings.InternalInstantiator_CLASS_ID_0_IS_ALREADY_REGISTERED_FOR_CLASS_1_SO_IT_COULD_NOT_BE_REGISTED_FOR_CLASS_2 + .toLocalizedString(new Object[] { Integer.valueOf(classId), oldClass.getName(), cName })); + } + else { + return; // it was already registered + } } } - } else { - dsMap.put(cName, instantiator); + else { + dsMap.put(cName, instantiator); + } + } + while (retry); + + // if instantiator is getting registered for first time + // its EventID will be null, so generate a new event id + // the the distributed system is connected + GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); + if (cache != null && instantiator.getEventId() == null) { + instantiator.setEventId(new EventID(cache.getDistributedSystem())); } - } while (retry); - // if instantiator is getting registered for first time - // its EventID will be null, so generate a new event id - // the the distributed system is connected - GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); - if (cache != null && instantiator.getEventId() == null) { - instantiator.setEventId(new EventID(cache.getDistributedSystem())); + logger.info(LocalizedMessage.create(LocalizedStrings.InternalInstantiator_REGISTERED, new Object[] { Integer.valueOf(classId), c.getName()} )); } - - logger.info(LocalizedMessage.create(LocalizedStrings.InternalInstantiator_REGISTERED, new Object[] { Integer.valueOf(classId), c.getName()} )); + if (distribute) { // originated in this VM // send a message to other peers telling them about a newly-registered // instantiator, it also send event id of the originator along with the http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f2e2774/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 303b6d6..ff903d4 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -2140,6 +2140,21 @@ public class WANTestBase extends JUnit4DistributedTestCase { public static void createClientWithLocator(int port0,String host, String regionName) { + createClientWithLocator(port0, host); + + AttributesFactory factory = new AttributesFactory(); + factory.setPoolName("pool"); + factory.setDataPolicy(DataPolicy.NORMAL); + RegionAttributes attrs = factory.create(); + region = cache.createRegion(regionName, attrs); + region.registerInterest("ALL_KEYS"); + assertNotNull(region); + LogWriterUtils.getLogWriter().info( + "Distributed Region " + regionName + " created Successfully :" + + region.toString()); + } + + public static void createClientWithLocator(final int port0, final String host) { WANTestBase test = new WANTestBase(); Properties props = test.getDistributedSystemProperties(); props.setProperty(MCAST_PORT, "0"); @@ -2156,21 +2171,10 @@ public class WANTestBase extends JUnit4DistributedTestCase { .setPingInterval(250).setSubscriptionEnabled(true) .setSubscriptionRedundancy(-1).setReadTimeout(2000) .setSocketBufferSize(1000).setMinConnections(6).setMaxConnections(10) - .setRetryAttempts(3).create(regionName); + .setRetryAttempts(3).create("pool"); } finally { CacheServerTestUtil.enableShufflingOfEndpoints(); } - - AttributesFactory factory = new AttributesFactory(); - factory.setPoolName(p.getName()); - factory.setDataPolicy(DataPolicy.NORMAL); - RegionAttributes attrs = factory.create(); - region = cache.createRegion(regionName, attrs); - region.registerInterest("ALL_KEYS"); - assertNotNull(region); - LogWriterUtils.getLogWriter().info( - "Distributed Region " + regionName + " created Successfully :" - + region.toString()); } public static int createReceiver_PDX(int locPort) { http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4f2e2774/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java index f20bf56..0522e46 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderOperationsDUnitTest.java @@ -16,6 +16,9 @@ */ package org.apache.geode.internal.cache.wan.serial; +import org.apache.geode.DataSerializable; +import org.apache.geode.DataSerializer; +import org.apache.geode.Instantiator; import org.junit.experimental.categories.Category; import org.junit.Test; @@ -25,6 +28,10 @@ import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.Serializable; import java.util.HashSet; import java.util.Set; @@ -606,9 +613,45 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { vm1.invoke(check); } - - + @Test + public void registeringInstantiatorsInGatewayShouldNotCauseDeadlock() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(() -> createReceiverAndServer( nyPort )); + vm3.invoke(() -> createReceiverAndServer( lnPort )); + + vm2.invoke(() -> createSender( "ln", 1, false, 100, 10, false, false, null, false)); + vm3.invoke(() -> createSender( "ny", 2, false, 100, 10, false, false, null, false )); + + vm4.invoke(() -> createClientWithLocator(nyPort, "localhost")); + + // Register instantiator + vm4.invoke(() -> Instantiator.register(new TestObjectInstantiator())); + } + + @Test + public void registeringDataSerializableInGatewayShouldNotCauseDeadlock() { + Integer lnPort = (Integer)vm0.invoke(WANTestBase.class, + "createFirstLocatorWithDSId", new Object[] { 1 }); + Integer nyPort = (Integer)vm1.invoke(WANTestBase.class, + "createFirstRemoteLocator", new Object[] { 2, lnPort }); + + vm2.invoke(() -> createReceiverAndServer( nyPort )); + vm3.invoke(() -> createReceiverAndServer( lnPort )); + + vm2.invoke(() -> createSender( "ln", 1, false, 100, 10, false, false, null, false)); + vm3.invoke(() -> createSender( "ny", 2, false, 100, 10, false, false, null, false )); + + vm4.invoke(() -> createClientWithLocator(nyPort, "localhost")); + + // Register instantiator + vm4.invoke(() -> DataSerializer.register(TestObjectDataSerializer.class)); + } + public static void verifySenderPausedState(String senderId) { Set<GatewaySender> senders = cache.getGatewaySenders(); AbstractGatewaySender sender = null; @@ -662,4 +705,63 @@ public class SerialGatewaySenderOperationsDUnitTest extends WANTestBase { sender.resume(); sender.stop(); } + + static class TestObjectInstantiator extends Instantiator { + + TestObjectInstantiator(Class<TestObject> c, byte id) { + super(c, id); + } + + TestObjectInstantiator() { + this(TestObject.class, (byte) 99); + } + + public DataSerializable newInstance() { + return new TestObject(); + } + } + + static class TestObjectDataSerializer extends DataSerializer implements Serializable { + + @Override + public Class<?>[] getSupportedClasses() { + return new Class<?>[] {TestObject.class}; + } + + @Override + public boolean toData(final Object o, final DataOutput out) throws IOException { + return o instanceof TestObject; + } + + @Override + public Object fromData(final DataInput in) throws IOException, ClassNotFoundException { + return new TestObject(); + } + + @Override + public int getId() { + return 99; + } + } + + static class TestObject implements DataSerializable { + + private static final long serialVersionUID = 1L; + + protected String id; + + public TestObject() {} + + public TestObject(String id) { + this.id = id; + } + + public void toData(DataOutput out) throws IOException { + DataSerializer.writeString(this.id, out); + } + + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + this.id = DataSerializer.readString(in); + } + } }