This is an automated email from the ASF dual-hosted git repository. eshu11 pushed a commit to branch feature/GEODE-6900 in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6900 by this push: new 21078b4 GEODE-6900: Add a unit test with detect read conflicts 21078b4 is described below commit 21078b4d184c892a5d3ff70f5b13ba355ee4aa67 Author: eshu <e...@pivotal.io> AuthorDate: Fri Jun 21 17:29:14 2019 -0700 GEODE-6900: Add a unit test with detect read conflicts --- .../cache/TXDetectReadConflictJUnitTest.java | 108 ++++++++++++++++++--- 1 file changed, 95 insertions(+), 13 deletions(-) diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java index 810f148..7d671cb 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/TXDetectReadConflictJUnitTest.java @@ -16,9 +16,13 @@ package org.apache.geode.internal.cache; import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.Assert.assertEquals; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.After; import org.junit.Before; @@ -29,6 +33,7 @@ import org.junit.rules.TestName; import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.CommitConflictException; import org.apache.geode.cache.Region; import org.apache.geode.cache.RegionShortcut; import org.apache.geode.distributed.internal.DistributionConfig; @@ -45,10 +50,17 @@ public class TXDetectReadConflictJUnitTest { @Rule public TestName name = new TestName(); - protected Cache cache = null; - protected Region region = null; - protected Region regionpr = null; - + private Cache cache = null; + private Region region = null; + private Region regionPR = null; + private CountDownLatch allowWriteTransactionToCommitLatch = new CountDownLatch(1); + private CountDownLatch allowReadTransactionToProceedLatch = new CountDownLatch(1); + private final String key = "key"; + private final String key1 = "key1"; + private final String value = "value"; + private final String value1 = "value"; + private final String newValue = "newValue"; + private final String newValue1 = "newValue1"; @Before public void setUp() throws Exception { @@ -69,7 +81,7 @@ public class TXDetectReadConflictJUnitTest { props.put(MCAST_PORT, "0"); props.put(LOCATORS, ""); cache = new CacheFactory(props).create(); - regionpr = cache.createRegionFactory(RegionShortcut.PARTITION).create("testRegionPR"); + regionPR = cache.createRegionFactory(RegionShortcut.PARTITION).create("testRegionPR"); } @After @@ -81,12 +93,12 @@ public class TXDetectReadConflictJUnitTest { public void testReadConflictsRR() throws Exception { cache.close(); createCache(); - region.put("key", "value"); - region.put("key1", "value1"); + region.put(key, value); + region.put(key1, value1); TXManagerImpl mgr = (TXManagerImpl) cache.getCacheTransactionManager(); mgr.begin(); - assertEquals("value", region.get("key")); - assertEquals("value1", region.get("key1")); + assertEquals(value, region.get(key)); + assertEquals(value1, region.get(key1)); mgr.commit(); } @@ -94,12 +106,82 @@ public class TXDetectReadConflictJUnitTest { public void testReadConflictsPR() throws Exception { cache.close(); createCachePR(); - regionpr.put("key", "value"); - regionpr.put("key1", "value1"); + regionPR.put(key, value); + regionPR.put(key1, value1); TXManagerImpl mgr = (TXManagerImpl) cache.getCacheTransactionManager(); mgr.begin(); - assertEquals("value", regionpr.get("key")); - assertEquals("value1", regionpr.get("key1")); + assertEquals(value, regionPR.get(key)); + assertEquals(value1, regionPR.get(key1)); mgr.commit(); } + + @Test + public void readConflictsTransactionCanBlockWriteTransaction() { + cache.close(); + createCache(); + + region.put(key, value); + region.put(key1, value1); + TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); + txManager.begin(); + assertThat(region.get(key)).isSameAs(value); + region.put(key1, newValue1); + TXState txState = + (TXState) ((TXStateProxyImpl) TXManagerImpl.getCurrentTXState()).getRealDeal(null, null); + txState.setAfterReservation(() -> readTransactionAfterReservation()); + Runnable task = () -> doPutOnReadKeyTransaction(); + new Thread(task).start(); + txManager.commit(); + assertThat(region.get(key)).isSameAs(value); + assertThat(region.get(key1)).isSameAs(newValue1); + } + + private void readTransactionAfterReservation() { + allowWriteTransactionToCommitLatch.countDown(); + try { + allowReadTransactionToProceedLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + private void doPutOnReadKeyTransaction() { + TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); + txManager.begin(); + region.put(key, newValue); // expect commit conflict + try { + allowWriteTransactionToCommitLatch.await(10, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + assertThatThrownBy(() -> txManager.commit()).isExactlyInstanceOf(CommitConflictException.class); + allowReadTransactionToProceedLatch.countDown(); + } + + @Test + public void readConflictsTransactionCanDetectStateChange() throws Exception { + cache.close(); + createCache(); + + region.put(key, value); + region.put(key1, value1); + TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); + txManager.begin(); + assertThat(region.get(key)).isSameAs(value); + region.put(key1, newValue1); + Runnable task = () -> doPutTransaction(); + new Thread(task).start(); + allowReadTransactionToProceedLatch.await(); + assertThatThrownBy(() -> txManager.commit()).isExactlyInstanceOf(CommitConflictException.class); + assertThat(region.get(key)).isSameAs(newValue); + assertThat(region.get(key1)).isSameAs(value1); + } + + private void doPutTransaction() { + TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager(); + txManager.begin(); + region.put(key, newValue); // expect commit conflict + txManager.commit(); + allowReadTransactionToProceedLatch.countDown(); + } }