Cleanup HARegionQueueJUnitTest and BlockingHARegionQueueJUnitTest
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/107d3c4b Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/107d3c4b Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/107d3c4b Branch: refs/heads/feature/GEODE-2632-17 Commit: 107d3c4bf362bd70b489ea78585919264c81dc82 Parents: d393b4a Author: Kirk Lund <kl...@apache.org> Authored: Mon May 22 17:23:46 2017 -0700 Committer: Kirk Lund <kl...@apache.org> Committed: Tue May 23 14:47:40 2017 -0700 ---------------------------------------------------------------------- .../ha/BlockingHARegionQueueJUnitTest.java | 169 +- .../cache/ha/HARegionQueueJUnitTest.java | 2307 +++++++++--------- 2 files changed, 1167 insertions(+), 1309 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/107d3c4b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java index 39aa1e6..b529f0c 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java @@ -14,166 +14,141 @@ */ package org.apache.geode.internal.cache.ha; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.hamcrest.CoreMatchers.*; +import static org.junit.Assert.*; -import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.awaitility.Awaitility; - -import org.apache.geode.test.junit.categories.ClientSubscriptionTest; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.apache.geode.cache.CacheException; import org.apache.geode.internal.cache.Conflatable; import org.apache.geode.internal.cache.EventID; +import org.apache.geode.test.junit.categories.ClientSubscriptionTest; import org.apache.geode.test.junit.categories.IntegrationTest; /** * Test runs all tests of HARegionQueueJUnitTest using BlockingHARegionQueue instead of * HARegionQueue. - * - * */ @Category({IntegrationTest.class, ClientSubscriptionTest.class}) public class BlockingHARegionQueueJUnitTest extends HARegionQueueJUnitTest { - /** - * Creates Blocking HA region-queue object - * - * @return Blocking HA region-queue object - * @throws IOException - * @throws ClassNotFoundException - * @throws CacheException - * @throws InterruptedException - */ - protected HARegionQueue createHARegionQueue(String name) - throws IOException, ClassNotFoundException, CacheException, InterruptedException { - HARegionQueue regionqueue = - HARegionQueue.getHARegionQueueInstance(name, cache, HARegionQueue.BLOCKING_HA_QUEUE, false); - return regionqueue; - } - - /** - * Creates Blocking HA region-queue object - * - * @return Blocking HA region-queue object - * @throws IOException - * @throws ClassNotFoundException - * @throws CacheException - * @throws InterruptedException - */ - protected HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs) - throws IOException, ClassNotFoundException, CacheException, InterruptedException { - HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs, - HARegionQueue.BLOCKING_HA_QUEUE, false); - return regionqueue; + @Override + protected int queueType() { + return HARegionQueue.BLOCKING_HA_QUEUE; } /** * Tests the effect of a put which is blocked because of capacity constraint & subsequent passage * because of take operation - * */ @Test - public void testBlockingPutAndTake() - throws InterruptedException, IOException, ClassNotFoundException { + public void testBlockingPutAndTake() throws Exception { HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); hrqa.setBlockingQueueCapacity(1); - final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndTake", hrqa); - hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only. + + HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), hrqa); + hrq.setPrimary(true); // fix for 40314 - capacity constraint is checked for primary only. + EventID id1 = new EventID(new byte[] {1}, 1, 1); hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); - Thread t1 = new Thread(new Runnable() { - public void run() { - try { - EventID id2 = new EventID(new byte[] {1}, 1, 2); - hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); - } catch (Exception e) { - encounteredException = true; - } + + AtomicBoolean threadStarted = new AtomicBoolean(false); + + Thread thread = new Thread(() -> { + try { + threadStarted.set(true); + EventID id2 = new EventID(new byte[] {1}, 1, 2); + hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); + } catch (InterruptedException e) { + errorCollector.addError(e); } }); - t1.start(); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive()); + thread.start(); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get()); + Conflatable conf = (Conflatable) hrq.take(); - assertNotNull(conf); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive()); + assertThat(conf, notNullValue()); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive()); } /** * Test Scenario : BlockingQueue capacity is 1. The first put should be successful. The second put * should block till a peek/remove happens. - * */ @Test - public void testBlockingPutAndPeekRemove() - throws InterruptedException, IOException, ClassNotFoundException { + public void testBlockingPutAndPeekRemove() throws Exception { HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); hrqa.setBlockingQueueCapacity(1); - final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndPeekRemove", hrqa); + + HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName(), hrqa); hrq.setPrimary(true);// fix for 40314 - capacity constraint is checked for primary only. + EventID id1 = new EventID(new byte[] {1}, 1, 1); hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); - Thread t1 = new Thread(new Runnable() { - public void run() { - try { - EventID id2 = new EventID(new byte[] {1}, 1, 2); - hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); - } catch (Exception e) { - encounteredException = true; - } + + AtomicBoolean threadStarted = new AtomicBoolean(false); + + Thread thread = new Thread(() -> { + try { + threadStarted.set(true); + EventID id2 = new EventID(new byte[] {1}, 1, 2); + hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); + } catch (Exception e) { + errorCollector.addError(e); } }); - t1.start(); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive()); + thread.start(); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get()); + Conflatable conf = (Conflatable) hrq.peek(); - assertNotNull(conf); + assertThat(conf, notNullValue()); + hrq.remove(); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive()); - assertFalse("Exception occurred in put-thread", encounteredException); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive()); } /** * Test Scenario :Blocking Queue capacity is 1. The first put should be successful.The second put * should block till the first put expires. - * + * <p> + * fix for 40314 - capacity constraint is checked for primary only and expiry is not applicable on + * primary so marking this test as invalid. */ - // fix for 40314 - capacity constraint is checked for primary only and - // expiry is not applicable on primary so marking this test as invalid. - @Ignore @Test - public void testBlockingPutAndExpiry() - throws InterruptedException, IOException, ClassNotFoundException { + public void testBlockingPutAndExpiry() throws Exception { HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); hrqa.setBlockingQueueCapacity(1); hrqa.setExpiryTime(1); - final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndExpiry", hrqa); + + HARegionQueue hrq = this.createHARegionQueue(this.testName.getMethodName(), hrqa); EventID id1 = new EventID(new byte[] {1}, 1, 1); - long start = System.currentTimeMillis(); + hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); - Thread t1 = new Thread(new Runnable() { - public void run() { - try { - EventID id2 = new EventID(new byte[] {1}, 1, 2); - hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); - } catch (Exception e) { - encounteredException = true; - } + + AtomicBoolean threadStarted = new AtomicBoolean(false); + + Thread thread = new Thread(() -> { + try { + threadStarted.set(true); + EventID id2 = new EventID(new byte[] {1}, 1, 2); + hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); + } catch (Exception e) { + errorCollector.addError(e); } }); - t1.start(); - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive()); - waitAtLeast(1000, start, () -> { - assertFalse("Put-thread blocked unexpectedly", t1.isAlive()); - }); - assertFalse("Exception occurred in put-thread", encounteredException); + thread.start(); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> threadStarted.get()); + + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !thread.isAlive()); } }