This is an automated email from the ASF dual-hosted git repository. jchen21 pushed a commit to branch support/1.13 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.13 by this push: new 35d2305 GEODE-8671: Two threads calling get and retrieve the same PdxInstance, resulting in corruption (#5925) 35d2305 is described below commit 35d230586db7d5a4eee8b91ee93ce0b4ee2c7c67 Author: Jianxia Chen <11181423+jche...@users.noreply.github.com> AuthorDate: Wed Mar 3 09:19:36 2021 -0800 GEODE-8671: Two threads calling get and retrieve the same PdxInstance, resulting in corruption (#5925) For PdxInstance, return a new reference in LocalRegion.optimizedGetObject(), instead of using the value in the Future. This is to avoid Pdx corruption when multiple threads share the same reference of PdxInstance. (cherry picked from commit dabb610b74bb0b27603d7803ec3cdd1cbb16c43f) --- .../cache/RegionConcurrentOperationDUnitTest.java | 102 ++++++++++++++++----- .../apache/geode/internal/cache/LocalRegion.java | 33 ++++--- .../geode/internal/cache/LocalRegionTest.java | 43 +++++++++ 3 files changed, 146 insertions(+), 32 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java index d0ccd1d..cda199d 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/RegionConcurrentOperationDUnitTest.java @@ -14,41 +14,42 @@ */ package org.apache.geode.internal.cache; +import static org.apache.geode.cache.RegionShortcut.PARTITION; import static org.apache.geode.cache.RegionShortcut.REPLICATE; import static org.apache.geode.cache.RegionShortcut.REPLICATE_PROXY; import static org.apache.geode.test.dunit.VM.getVM; import static org.assertj.core.api.Assertions.assertThat; import java.io.Serializable; +import java.time.Duration; +import java.util.concurrent.Callable; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import org.junit.After; import org.junit.Rule; import org.junit.Test; +import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.CacheLoader; import org.apache.geode.cache.DataPolicy; import org.apache.geode.cache.LoaderHelper; import org.apache.geode.cache.Region; import org.apache.geode.cache.Scope; -import org.apache.geode.test.dunit.DUnitBlackboard; +import org.apache.geode.pdx.PdxReader; +import org.apache.geode.pdx.PdxSerializable; +import org.apache.geode.pdx.PdxWriter; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.rules.CacheRule; +import org.apache.geode.test.dunit.rules.DistributedBlackboard; import org.apache.geode.test.dunit.rules.DistributedRule; import org.apache.geode.test.junit.rules.ExecutorServiceRule; public class RegionConcurrentOperationDUnitTest implements Serializable { - private static DUnitBlackboard blackboard; - Object key = "KEY"; String value = "VALUE"; - private static DUnitBlackboard getBlackboard() { - if (blackboard == null) { - blackboard = new DUnitBlackboard(); - } + private DistributedBlackboard getBlackboard() { return blackboard; } @@ -61,10 +62,8 @@ public class RegionConcurrentOperationDUnitTest implements Serializable { @Rule public ExecutorServiceRule executorServiceRule = new ExecutorServiceRule(); - @After - public void tearDown() { - blackboard.initBlackboard(); - } + @Rule + public DistributedBlackboard blackboard = new DistributedBlackboard(); @Test public void getOnProxyRegionFromMultipleThreadsReturnsDifferentObjects() throws Exception { @@ -80,13 +79,13 @@ public class RegionConcurrentOperationDUnitTest implements Serializable { .setCacheLoader(new TestCacheLoader()).create(regionName); }); - Future get1 = executorServiceRule.submit(() -> { - Region region = cacheRule.getCache().getRegion(regionName); + Future<Object> get1 = executorServiceRule.submit(() -> { + Region<Object, Object> region = cacheRule.getCache().getRegion(regionName); return region.get(key); }); - Future get2 = executorServiceRule.submit(() -> { - Region region = cacheRule.getCache().getRegion(regionName); + Future<Object> get2 = executorServiceRule.submit(() -> { + Region<Object, Object> region = cacheRule.getCache().getRegion(regionName); getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS); return region.get(key); }); @@ -113,13 +112,13 @@ public class RegionConcurrentOperationDUnitTest implements Serializable { }); assertThat(cacheRule.getCache().getRegion(regionName).size()).isEqualTo(0); - Future get1 = executorServiceRule.submit(() -> { - Region region = cacheRule.getCache().getRegion(regionName); + Future<Object> get1 = executorServiceRule.submit(() -> { + Region<Object, Object> region = cacheRule.getCache().getRegion(regionName); return region.get(key); }); - Future get2 = executorServiceRule.submit(() -> { - Region region = cacheRule.getCache().getRegion(regionName); + Future<Object> get2 = executorServiceRule.submit(() -> { + Region<Object, Object> region = cacheRule.getCache().getRegion(regionName); getBlackboard().waitForGate("Loader", 60, TimeUnit.SECONDS); return region.get(key); }); @@ -131,7 +130,50 @@ public class RegionConcurrentOperationDUnitTest implements Serializable { assertThat(cacheRule.getCache().getRegion(regionName).size()).isEqualTo(1); } - private class TestCacheLoader implements CacheLoader, Serializable { + @Test + public void getOnPartitionedRegionFromMultipleThreadsReturnsDifferentPdxInstances() + throws Exception { + String regionName = getClass().getSimpleName(); + CacheFactory cacheFactory = new CacheFactory(); + cacheFactory.setPdxReadSerialized(true); + cacheRule.createCache(cacheFactory); + InternalCache cache = cacheRule.getCache(); + cache.setCopyOnRead(true); + Region<Object, Object> region = cache.createRegionFactory(PARTITION) + .create(regionName); + + // Keep doing this concurrency test for 30 seconds. + long endTime = Duration.ofSeconds(30).toMillis() + System.currentTimeMillis(); + + while (System.currentTimeMillis() < endTime) { + Callable<Object> getValue = () -> { + while (true) { + Object value = region.get(key); + if (value != null) { + return value; + } + } + }; + + // In this test, two threads are doing gets. One thread puts the value + // We expect that the threads will *always* get different PdxInstance values + Future<Object> get1 = executorServiceRule.submit(getValue); + Future<Object> get2 = executorServiceRule.submit(getValue); + Future<Object> put = executorServiceRule.submit(() -> region.put(key, new TestValue())); + + Object get1value = get1.get(); + Object get2value = get2.get(); + put.get(); + + // Assert the values returned are different objects. + // PdxInstances are not threadsafe and should not be shared between threads. + assertThat(get1value).isNotSameAs(get2value); + region.destroy(key); + + } + } + + private class TestCacheLoader implements CacheLoader<Object, Object>, Serializable { @Override public synchronized Object load(LoaderHelper helper) { @@ -142,4 +184,22 @@ public class RegionConcurrentOperationDUnitTest implements Serializable { @Override public void close() {} } + + private static class TestValue implements PdxSerializable { + int field1 = 5; + String field2 = "field"; + + @Override + public void toData(PdxWriter writer) { + writer.writeInt("field1", field1); + writer.writeString("field2", field2); + + } + + @Override + public void fromData(PdxReader reader) { + reader.readInt("field1"); + reader.readString("field2"); + } + } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java index 098115b..482cf67 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java @@ -1450,8 +1450,8 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return result; } - - private Object getObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks, + @VisibleForTesting + Object getObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) { @@ -1490,10 +1490,16 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, return result; } + @VisibleForTesting + Map getGetFutures() { + return this.getFutures; + } + /** * optimized to only allow one thread to do a search/load, other threads wait on a future */ - private Object optimizedGetObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks, + @VisibleForTesting + Object optimizedGetObject(KeyInfo keyInfo, boolean isCreate, boolean generateCallbacks, Object localValue, boolean disableCopyOnRead, boolean preferCD, ClientProxyMembershipID requestingClient, EntryEventImpl clientEvent, boolean returnTombstones) { @@ -1506,9 +1512,7 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, Object[] valueAndVersion = (Object[]) otherFuture.get(); if (valueAndVersion != null) { result = valueAndVersion[0]; - if (clientEvent != null) { - clientEvent.setVersionTag((VersionTag) valueAndVersion[1]); - } + if (!preferCD && result instanceof CachedDeserializable) { CachedDeserializable cd = (CachedDeserializable) result; if (!disableCopyOnRead && (isCopyOnRead() || isProxy())) { @@ -1520,12 +1524,19 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory, } else if (!disableCopyOnRead) { result = conditionalCopy(result); } - // what was a miss is now a hit - if (isCreate) { - RegionEntry regionEntry = basicGetEntry(keyInfo.getKey()); - updateStatsForGet(regionEntry, true); + // GEODE-8671: for PdxInstance, we need a new reference of it. Don't use the value from + // the Future. + if (!(result instanceof PdxInstance)) { + if (clientEvent != null) { + clientEvent.setVersionTag((VersionTag) valueAndVersion[1]); + } + // what was a miss is now a hit + if (isCreate) { + RegionEntry regionEntry = basicGetEntry(keyInfo.getKey()); + updateStatsForGet(regionEntry, true); + } + return result; } - return result; } } catch (InterruptedException ignore) { Thread.currentThread().interrupt(); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java index d695565..e2cc6fa 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/LocalRegionTest.java @@ -56,7 +56,11 @@ import org.apache.geode.internal.cache.AbstractRegion.PoolFinder; import org.apache.geode.internal.cache.LocalRegion.RegionMapConstructor; import org.apache.geode.internal.cache.LocalRegion.ServerRegionProxyConstructor; import org.apache.geode.internal.cache.control.InternalResourceManager; +import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.VersionedObjectList; +import org.apache.geode.internal.cache.versions.VersionTag; +import org.apache.geode.internal.util.concurrent.FutureResult; +import org.apache.geode.pdx.PdxInstance; public class LocalRegionTest { private EntryEventFactory entryEventFactory; @@ -222,4 +226,43 @@ public class LocalRegionTest { assertThat(result.get("key1")).isNull(); assertThat(result.get("key2")).isEqualTo("value2"); } + + @Test + public void forPdxInstanceByPassTheFutureInLocalRegionOptimizedGetObject() { + LocalRegion region = + spy(new LocalRegion("region", regionAttributes, null, cache, internalRegionArguments, + internalDataView, regionMapConstructor, serverRegionProxyConstructor, entryEventFactory, + poolFinder, regionPerfStatsFactory, disabledClock())); + KeyInfo keyInfo = mock(KeyInfo.class); + Object key = new Object(); + Object result = new Object(); + when(keyInfo.getKey()).thenReturn(key); + FutureResult thisFuture = new FutureResult(mock(CancelCriterion.class)); + thisFuture.set(new Object[] {result, mock(VersionTag.class)}); + region.getGetFutures().put(key, thisFuture); + // For non-PdxInstance, use the value in the Future + Object object = region.optimizedGetObject(keyInfo, true, true, + new Object(), true, true, + mock(ClientProxyMembershipID.class), mock(EntryEventImpl.class), + true); + assertThat(object).isSameAs(result); + + // For PdxInstance, return a new reference by getObject(), bypassing the Future + result = mock(PdxInstance.class); + thisFuture.set(new Object[] {result, mock(VersionTag.class)}); + Object newResult = new Object(); + Object localValue = new Object(); + ClientProxyMembershipID requestingClient = mock(ClientProxyMembershipID.class); + EntryEventImpl clientEvent = mock(EntryEventImpl.class); + when(region.getObject(keyInfo, true, true, + localValue, true, true, + requestingClient, clientEvent, + true)).thenReturn(newResult); + object = region.optimizedGetObject(keyInfo, true, true, + localValue, true, true, + requestingClient, clientEvent, + true); + assertThat(object).isNotSameAs(result); + assertThat(object).isSameAs(newResult); + } }