http://git-wip-us.apache.org/repos/asf/geode/blob/d1ec508e/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java index 4028ab3..929093d 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java @@ -15,6 +15,8 @@ package org.apache.geode.internal.cache.ha; import static org.apache.geode.distributed.ConfigurationProperties.*; +import static org.hamcrest.CoreMatchers.*; +import static org.hamcrest.number.OrderingComparison.*; import static org.junit.Assert.*; import java.io.IOException; @@ -23,101 +25,75 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.awaitility.Awaitility; - -import org.apache.geode.internal.cache.InternalCache; -import org.apache.geode.test.junit.categories.ClientSubscriptionTest; import org.junit.After; import org.junit.Before; -import org.junit.Ignore; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ErrorCollector; +import org.junit.rules.TestName; -import org.apache.geode.LogWriter; -import org.apache.geode.SystemFailure; -import org.apache.geode.cache.Cache; import org.apache.geode.cache.CacheException; import org.apache.geode.cache.CacheFactory; import org.apache.geode.cache.CacheListener; import org.apache.geode.cache.EntryEvent; import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionExistsException; import org.apache.geode.cache.util.CacheListenerAdapter; -import org.apache.geode.distributed.DistributedSystem; import org.apache.geode.internal.cache.Conflatable; import org.apache.geode.internal.cache.EventID; +import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.RegionQueue; import org.apache.geode.test.dunit.ThreadUtils; +import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties; +import org.apache.geode.test.junit.categories.ClientSubscriptionTest; import org.apache.geode.test.junit.categories.IntegrationTest; /** * This is a test for the APIs of a HARegionQueue and verifies that the head, tail and size counters * are updated properly. + * + * TODO: need to rewrite a bunch of tests in HARegionQueueJUnitTest */ @Category({IntegrationTest.class, ClientSubscriptionTest.class}) public class HARegionQueueJUnitTest { - /** The cache instance */ - protected InternalCache cache = null; + /** total number of threads doing put operations */ + private static final int TOTAL_PUT_THREADS = 10; - /** Logger for this test */ - protected LogWriter logger; + private static HARegionQueue hrqForTestSafeConflationRemoval; + private static List list1; - /** The <code>RegionQueue</code> instance */ - protected HARegionQueue rq; + protected InternalCache cache; + private HARegionQueue haRegionQueue; - /** total number of threads doing put operations */ - private static final int TOTAL_PUT_THREADS = 10; + @Rule + public DistributedRestoreSystemProperties restoreSystemProperties = + new DistributedRestoreSystemProperties(); - boolean expiryCalled = false; + @Rule + public ErrorCollector errorCollector = new ErrorCollector(); - volatile boolean encounteredException = false; - boolean allowExpiryToProceed = false; - boolean complete = false; + @Rule + public TestName testName = new TestName(); @Before public void setUp() throws Exception { - cache = createCache(); - logger = cache.getLogger(); - encounteredException = false; + this.cache = createCache(); } @After public void tearDown() throws Exception { - cache.close(); - } - - /** - * Creates the cache instance for the test - */ - private InternalCache createCache() throws CacheException { - return (InternalCache) new CacheFactory().set(MCAST_PORT, "0").create(); - } - - /** - * Creates HA region-queue object - */ - private HARegionQueue createHARegionQueue(String name) - throws IOException, ClassNotFoundException, CacheException, InterruptedException { - HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, - HARegionQueue.NON_BLOCKING_HA_QUEUE, false); - return regionqueue; - } - - /** - * Creates region-queue object - */ - private HARegionQueue createHARegionQueue(String name, HARegionQueueAttributes attrs) - throws IOException, ClassNotFoundException, CacheException, InterruptedException { - HARegionQueue regionqueue = HARegionQueue.getHARegionQueueInstance(name, cache, attrs, - HARegionQueue.NON_BLOCKING_HA_QUEUE, false); - return regionqueue; + this.cache.close(); + hrqForTestSafeConflationRemoval = null; } /** @@ -129,14 +105,10 @@ public class HARegionQueueJUnitTest { */ @Test public void testQueuePutWithoutConflation() throws Exception { - logger.info("HARegionQueueJUnitTest : testQueuePutWithoutConflation BEGIN"); - - rq = createHARegionQueue("testOfferNoConflation"); + this.haRegionQueue = createHARegionQueue(this.testName.getMethodName()); int putPerProducer = 20; createAndRunProducers(false, false, false, putPerProducer); - assertEquals(putPerProducer * TOTAL_PUT_THREADS, rq.size()); - - logger.info("HARegionQueueJUnitTest : testQueuePutWithoutConflation END"); + assertThat(this.haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS)); } /** @@ -149,14 +121,10 @@ public class HARegionQueueJUnitTest { */ @Test public void testQueuePutWithConflation() throws Exception { - logger.info("HARegionQueueJUnitTest : testQueuePutWithConflation BEGIN"); - - rq = createHARegionQueue("testOfferConflation"); + this.haRegionQueue = createHARegionQueue(this.testName.getMethodName()); int putPerProducer = 20; createAndRunProducers(true, false, true, putPerProducer); - assertEquals(putPerProducer, rq.size()); - - logger.info("HARegionQueueJUnitTest : testQueuePutWithConflation END"); + assertThat(this.haRegionQueue.size(), is(putPerProducer)); } /** @@ -166,319 +134,150 @@ public class HARegionQueueJUnitTest { * 3)Wait till all put-threads complete their job <br> * 4)verify that the size of the queue is equal to the total number of puts done by one thread (as * rest of them will be duplicates and hence will be replaced) - * - * TODO:Dinesh : Work on optimizing the handling of receiving duplicate events */ @Test public void testQueuePutWithDuplicates() throws Exception { - logger.info("HARegionQueueJUnitTest : testQueuePutWithDuplicates BEGIN"); - - rq = createHARegionQueue("testQueuePutWithDuplicates"); + this.haRegionQueue = createHARegionQueue(this.testName.getMethodName()); int putPerProducer = 20; - // createAndRunProducers(false, true, true, putPerProducer); - /* Suyog: Only one thread can enter DACE at a time */ createAndRunProducers(false, false, true, putPerProducer); - assertEquals(putPerProducer * TOTAL_PUT_THREADS, rq.size()); - - logger.info("HARegionQueueJUnitTest : testQueuePutWithDuplicates END"); - } - - /** - * Creates and runs the put threads which will create the conflatable objects and add them to the - * queue - * - * @param generateSameKeys - if all the producers need to put objects with same set of keys - * (needed for conflation testing) - * @param generateSameIds - if all the producers need to put objects with same set of ids (needed - * for duplicates testing) - * @param conflationEnabled - true if all producers need to put objects with conflation enabled, - * false otherwise. - * @param putPerProducer - number of objects offered to the queue by each producer - * @throws Exception - thrown if any problem occurs in test execution - */ - private void createAndRunProducers(boolean generateSameKeys, boolean generateSameIds, - boolean conflationEnabled, int putPerProducer) throws Exception { - Producer[] putThreads = new Producer[TOTAL_PUT_THREADS]; - - int i = 0; - - // Create the put-threads, each generating same/different set of ids/keys as - // per the parameters - for (i = 0; i < TOTAL_PUT_THREADS; i++) { - String keyPrefix = null; - long startId; - if (generateSameKeys) { - keyPrefix = "key"; - } else { - keyPrefix = i + "key"; - } - if (generateSameIds) { - startId = 1; - } else { - startId = i * 100000; - } - putThreads[i] = - new Producer("Producer-" + i, keyPrefix, startId, putPerProducer, conflationEnabled); - } - - // start the put-threads - for (i = 0; i < TOTAL_PUT_THREADS; i++) { - putThreads[i].start(); - } - - // call join on the put-threads so that this thread waits till they complete - // before doing verfication - for (i = 0; i < TOTAL_PUT_THREADS; i++) { - ThreadUtils.join(putThreads[i], 30 * 1000); - } - assertFalse(encounteredException); + assertThat(this.haRegionQueue.size(), is(putPerProducer * TOTAL_PUT_THREADS)); } /* * Test method for 'org.apache.geode.internal.cache.ha.HARegionQueue.addDispatchedMessage(Object)' */ @Test - public void testAddDispatchedMessageObject() { - try { - // HARegionQueue haRegionQueue = new HARegionQueue("testing", cache); - HARegionQueue haRegionQueue = createHARegionQueue("testing"); - assertTrue(HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty()); - // TODO: - - haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1); - haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 2), 2); + public void testAddDispatchedMessageObject() throws Exception { + this.haRegionQueue = createHARegionQueue(this.testName.getMethodName()); + assertThat(HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true)); - assertTrue(!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty()); - // HARegionQueue.getDispatchedMessagesMapForTesting().clear(); + this.haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 1), 1); + this.haRegionQueue.addDispatchedMessage(new ThreadIdentifier(new byte[1], 2), 2); - } catch (Exception e) { - throw new AssertionError("Test encountered an exception due to ", e); - } + assertThat(!HARegionQueue.getDispatchedMessagesMapForTesting().isEmpty(), is(true)); } /** * tests the blocking peek functionality of BlockingHARegionQueue */ @Test - public void testBlockQueue() { - exceptionInThread = false; - testFailed = false; - try { - final HARegionQueue bQ = HARegionQueue.getHARegionQueueInstance("testing", cache, - HARegionQueue.BLOCKING_HA_QUEUE, false); - Thread[] threads = new Thread[10]; - final CyclicBarrier barrier = new CyclicBarrier(threads.length + 1); - for (int i = 0; i < threads.length; i++) { - threads[i] = new Thread() { - public void run() { - try { - barrier.await(); - long startTime = System.currentTimeMillis(); - Object obj = bQ.peek(); - if (obj == null) { - testFailed = true; - message.append( - " Failed : failed since object was null and was not expected to be null \n"); - } - long totalTime = System.currentTimeMillis() - startTime; + public void testBlockQueue() throws Exception { + HARegionQueue regionQueue = HARegionQueue.getHARegionQueueInstance( + this.testName.getMethodName(), this.cache, HARegionQueue.BLOCKING_HA_QUEUE, false); + Thread[] threads = new Thread[10]; + int threadsLength = threads.length; + CyclicBarrier barrier = new CyclicBarrier(threadsLength + 1); + + for (int i = 0; i < threadsLength; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + barrier.await(); + long startTime = System.currentTimeMillis(); + Object obj = regionQueue.peek(); + if (obj == null) { + errorCollector.addError(new AssertionError( + "Failed : failed since object was null and was not expected to be null")); + } + long totalTime = System.currentTimeMillis() - startTime; - if (totalTime < 2000) { - testFailed = true; - message - .append(" Failed : Expected time to be greater than 2000 but it is not so "); - } - } catch (Exception e) { - exceptionInThread = true; - exception = e; + if (totalTime < 2000) { + errorCollector.addError(new AssertionError( + " Failed : Expected time to be greater than 2000 but it is not so ")); } + } catch (Exception e) { + errorCollector.addError(e); } - }; - - } - - for (int k = 0; k < threads.length; k++) { - threads[k].start(); - } - barrier.await(); - Thread.sleep(5000); - - EventID id = new EventID(new byte[] {1}, 1, 1); - bQ.put(new ConflatableObject("key", "value", id, false, "testing")); - - long startTime = System.currentTimeMillis(); - for (int k = 0; k < threads.length; k++) { - ThreadUtils.join(threads[k], 60 * 1000); - } - - long totalTime = System.currentTimeMillis() - startTime; - - if (totalTime >= 60000) { - fail(" Test taken too long "); - } - - if (testFailed) { - fail(" test failed due to " + message); - } - - } catch (Exception e) { - throw new AssertionError(" Test failed due to ", e); + } + }; } - } - - private static volatile int counter = 0; - - protected boolean exceptionInThread = false; - - protected boolean testFailed = false; - - protected StringBuffer message = new StringBuffer(); - - protected Exception exception = null; - private synchronized int getCounter() { - return ++counter; - } - - /** - * Thread to perform PUTs into the queue - */ - class Producer extends Thread { - /** total number of puts by this thread */ - long totalPuts = 0; - - /** sleep between successive puts */ - long sleeptime = 10; - - /** prefix to keys of all objects put by this thread */ - String keyPrefix; + for (Thread thread1 : threads) { + thread1.start(); + } - /** startingId for sequence-ids of all objects put by this thread */ - long startingId; + barrier.await(); - /** name of this producer thread */ - String producerName; + Thread.sleep(5000); - /** - * boolean to indicate whether this thread should create conflation enabled entries - */ - boolean createConflatables; + EventID id = new EventID(new byte[] {1}, 1, 1); + regionQueue + .put(new ConflatableObject("key", "value", id, false, this.testName.getMethodName())); - /** - * Constructor - * - * @param name - name for this thread - * @param keyPrefix - prefix to keys of all objects put by this thread - * @param startingId - startingId for sequence-ids of all objects put by this thread - * @param totalPuts total number of puts by this thread - * @param createConflatableEvents - boolean to indicate whether this thread should create - * conflation enabled entries - */ - Producer(String name, String keyPrefix, long startingId, long totalPuts, - boolean createConflatableEvents) { - super(name); - this.producerName = name; - this.keyPrefix = keyPrefix; - this.startingId = startingId; - this.totalPuts = totalPuts; - this.createConflatables = createConflatableEvents; - setDaemon(true); + long startTime = System.currentTimeMillis(); + for (Thread thread : threads) { + ThreadUtils.join(thread, 60 * 1000); } - /** Create Conflatable objects and put them into the Queue. */ - @Override - public void run() { - if (producerName == null) { - producerName = Thread.currentThread().getName(); - } - for (long i = 0; i < totalPuts; i++) { - String REGION_NAME = "test"; - try { - ConflatableObject event = new ConflatableObject(keyPrefix + i, "val" + i, - new EventID(new byte[] {1}, startingId, startingId + i), createConflatables, - REGION_NAME); - - logger.fine("putting for key = " + keyPrefix + i); - rq.put(event); - Thread.sleep(sleeptime); - } catch (VirtualMachineError e) { - SystemFailure.initiateFailure(e); - throw e; - } catch (Throwable e) { - logger.severe("Exception while running Producer;continue running.", e); - encounteredException = true; - break; - } - } - logger.info(producerName + " : Puts completed"); + long totalTime = System.currentTimeMillis() - startTime; + + if (totalTime >= 60000) { + fail(" Test taken too long "); } } /** - * tests whether expiry of entry in the regin queue occurs as expected + * tests whether expiry of entry in the region queue occurs as expected */ @Test - public void testExpiryPositive() - throws InterruptedException, IOException, ClassNotFoundException { + public void testExpiryPositive() throws Exception { HARegionQueueAttributes haa = new HARegionQueueAttributes(); haa.setExpiryTime(1); - HARegionQueue regionqueue = createHARegionQueue("testing", haa); + + HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa); long start = System.currentTimeMillis(); - regionqueue.put( - new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing")); - Map map = (Map) regionqueue.getConflationMapForTesting().get("testing"); + + regionQueue.put(new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, + this.testName.getMethodName())); + + Map map = (Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()); waitAtLeast(1000, start, () -> { - assertEquals(Collections.EMPTY_MAP, map); - assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys()); + assertThat(map, is(Collections.emptyMap())); + assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet())); }); } /** - * Wait until a given runnable stops throwing exceptions. It should take at least - * minimumElapsedTime after the supplied start time to happen. - * - * This is useful for validating that an entry doesn't expire until a certain amount of time has - * passed - */ - protected void waitAtLeast(final int minimumElapsedTIme, final long start, - final Runnable runnable) { - Awaitility.await().atMost(1, TimeUnit.MINUTES).until(runnable); - long elapsed = System.currentTimeMillis() - start; - assertTrue(elapsed >= minimumElapsedTIme); - } - - /** * tests whether expiry of a conflated entry in the region queue occurs as expected */ @Test - public void testExpiryPositiveWithConflation() - throws InterruptedException, IOException, ClassNotFoundException { + public void testExpiryPositiveWithConflation() throws Exception { HARegionQueueAttributes haa = new HARegionQueueAttributes(); haa.setExpiryTime(1); - HARegionQueue regionqueue = createHARegionQueue("testing", haa); + + HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa); long start = System.currentTimeMillis(); - regionqueue.put( - new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing")); - regionqueue.put(new ConflatableObject("key", "newValue", new EventID(new byte[] {1}, 1, 2), - true, "testing")); - assertTrue( + + regionQueue.put(new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, + this.testName.getMethodName())); + + regionQueue.put(new ConflatableObject("key", "newValue", new EventID(new byte[] {1}, 1, 2), + true, this.testName.getMethodName())); + + assertThat( " Expected region size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.size() == 0)); - assertTrue( + !regionQueue.isEmpty(), is(true)); + assertThat( " Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.getAvalaibleIds().size() == 0)); - assertTrue( + !regionQueue.getAvalaibleIds().isEmpty(), is(true)); + assertThat( " Expected conflation map size not to be zero since expiry time has not been exceeded but it is not so " - + ((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key"))), - !((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key")) == null)); - assertTrue( + + ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())) + .get("key"), + ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())) + .get("key"), + not(sameInstance(null))); + assertThat( " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.getEventsMapForTesting().size() == 0)); + !regionQueue.getEventsMapForTesting().isEmpty(), is(true)); waitAtLeast(1000, start, () -> { - assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys()); - assertEquals(Collections.EMPTY_SET, regionqueue.getAvalaibleIds()); - assertEquals(Collections.EMPTY_MAP, regionqueue.getConflationMapForTesting().get("testing")); - assertEquals(Collections.EMPTY_MAP, regionqueue.getEventsMapForTesting()); + assertThat(regionQueue.getRegion().keys(), is(Collections.emptySet())); + assertThat(regionQueue.getAvalaibleIds(), is(Collections.emptySet())); + assertThat(regionQueue.getConflationMapForTesting().get(this.testName.getMethodName()), + is(Collections.emptyMap())); + assertThat(regionQueue.getEventsMapForTesting(), is(Collections.emptyMap())); }); } @@ -486,38 +285,37 @@ public class HARegionQueueJUnitTest { * tests a ThreadId not being expired if it was updated */ @Test - public void testNoExpiryOfThreadId() { - try { - HARegionQueueAttributes haa = new HARegionQueueAttributes(); - haa.setExpiryTime(45); - // RegionQueue regionqueue = new HARegionQueue("testing", cache, haa); - HARegionQueue regionqueue = createHARegionQueue("testing", haa); - EventID ev1 = new EventID(new byte[] {1}, 1, 1); - EventID ev2 = new EventID(new byte[] {1}, 1, 2); - Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing"); - Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, "testing"); - regionqueue.put(cf1); - final long tailKey = regionqueue.tailKey.get(); - regionqueue.put(cf2); - // Invalidate will trigger the expiration of the entry - // See HARegionQueue.createCacheListenerForHARegion - regionqueue.getRegion().invalidate(tailKey); - assertTrue( - " Expected region size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.size() == 0)); - assertTrue(" Expected the available id's size not to have counter 1 but it has ", - !(regionqueue.getAvalaibleIds().contains(new Long(1)))); - assertTrue(" Expected the available id's size to have counter 2 but it does not have ", - (regionqueue.getAvalaibleIds().contains(new Long(2)))); - assertTrue(" Expected eventID map not to have the first event, but it has", - !(regionqueue.getCurrentCounterSet(ev1).contains(new Long(1)))); - assertTrue(" Expected eventID map to have the second event, but it does not", - (regionqueue.getCurrentCounterSet(ev2).contains(new Long(2)))); - } + public void testNoExpiryOfThreadId() throws Exception { + HARegionQueueAttributes haa = new HARegionQueueAttributes(); + haa.setExpiryTime(45); - catch (Exception e) { - throw new AssertionError("test failed due to ", e); - } + HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), haa); + EventID ev1 = new EventID(new byte[] {1}, 1, 1); + EventID ev2 = new EventID(new byte[] {1}, 1, 2); + Conflatable cf1 = + new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName()); + Conflatable cf2 = + new ConflatableObject("key", "value2", ev2, true, this.testName.getMethodName()); + + regionQueue.put(cf1); + long tailKey = regionQueue.tailKey.get(); + regionQueue.put(cf2); + + // Invalidate will trigger the expiration of the entry + // See HARegionQueue.createCacheListenerForHARegion + regionQueue.getRegion().invalidate(tailKey); + + assertThat( + " Expected region size not to be zero since expiry time has not been exceeded but it is not so ", + !regionQueue.isEmpty(), is(true)); + assertThat(" Expected the available id's size not to have counter 1 but it has ", + !regionQueue.getAvalaibleIds().contains(1L), is(true)); + assertThat(" Expected the available id's size to have counter 2 but it does not have ", + regionQueue.getAvalaibleIds().contains(2L), is(true)); + assertThat(" Expected eventID map not to have the first event, but it has", + !regionQueue.getCurrentCounterSet(ev1).contains(1L), is(true)); + assertThat(" Expected eventID map to have the second event, but it does not", + regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true)); } /** @@ -525,66 +323,64 @@ public class HARegionQueueJUnitTest { * being put in the queue */ @Test - public void testQRMComingBeforeLocalPut() { - try { - // RegionQueue regionqueue = new HARegionQueue("testing", cache); - HARegionQueue regionqueue = createHARegionQueue("testing"); - EventID id = new EventID(new byte[] {1}, 1, 1); - regionqueue.removeDispatchedEvents(id); - regionqueue.put(new ConflatableObject("key", "value", id, true, "testing")); - assertTrue(" Expected key to be null since QRM for the message id had already arrived ", - !regionqueue.getRegion().containsKey(new Long(1))); - } catch (Exception e) { - throw new AssertionError("test failed due to ", e); - } + public void testQRMComingBeforeLocalPut() throws Exception { + HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName()); + EventID id = new EventID(new byte[] {1}, 1, 1); + + regionQueue.removeDispatchedEvents(id); + regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName())); + + assertThat(" Expected key to be null since QRM for the message id had already arrived ", + !regionQueue.getRegion().containsKey(1L), is(true)); } /** * test verifies correct expiry of ThreadIdentifier in the HARQ if no corresponding put comes */ @Test - public void testOnlyQRMComing() throws InterruptedException, IOException, ClassNotFoundException { + public void testOnlyQRMComing() throws Exception { HARegionQueueAttributes harqAttr = new HARegionQueueAttributes(); harqAttr.setExpiryTime(1); - // RegionQueue regionqueue = new HARegionQueue("testing", cache, harqAttr); - HARegionQueue regionqueue = createHARegionQueue("testing", harqAttr); + + HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName(), harqAttr); EventID id = new EventID(new byte[] {1}, 1, 1); long start = System.currentTimeMillis(); - regionqueue.removeDispatchedEvents(id); - assertTrue(" Expected testingID to be present since only QRM achieved ", - regionqueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1))); + + regionQueue.removeDispatchedEvents(id); + + assertThat(" Expected testingID to be present since only QRM achieved ", + regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)), is(true)); + waitAtLeast(1000, start, - () -> assertTrue( + () -> assertThat( " Expected testingID not to be present since it should have expired after 2.5 seconds", - !regionqueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)))); + !regionQueue.getRegion().containsKey(new ThreadIdentifier(new byte[] {1}, 1)), + is(true))); } /** * test all relevant data structures are updated on a local put */ @Test - public void testPutPath() { - try { - HARegionQueue regionqueue = createHARegionQueue("testing"); - Conflatable cf = - new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, "testing"); - regionqueue.put(cf); - assertTrue(" Expected region peek to return cf but it is not so ", - (regionqueue.peek().equals(cf))); - assertTrue( - " Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.getAvalaibleIds().size() == 0)); - assertTrue( - " Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ", - ((((Map) (regionqueue.getConflationMapForTesting().get("testing"))).get("key")) - .equals(new Long(1)))); - assertTrue( - " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.getEventsMapForTesting().size() == 0)); + public void testPutPath() throws Exception { + HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName()); + Conflatable cf = new ConflatableObject("key", "value", new EventID(new byte[] {1}, 1, 1), true, + this.testName.getMethodName()); - } catch (Exception e) { - throw new AssertionError("Exception occurred in test due to ", e); - } + regionQueue.put(cf); + + assertThat(" Expected region peek to return cf but it is not so ", regionQueue.peek(), is(cf)); + assertThat( + " Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ", + !regionQueue.getAvalaibleIds().isEmpty(), is(true)); + assertThat( + " Expected conflation map to have entry for this key since expiry time has not been exceeded but it is not so ", + ((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())) + .get("key"), + is(1L)); + assertThat( + " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", + !regionQueue.getEventsMapForTesting().isEmpty(), is(true)); } /** @@ -592,58 +388,64 @@ public class HARegionQueueJUnitTest { * there - verify the next five entries and their relevant data is present */ @Test - public void testQRMDispatch() { - try { - HARegionQueue regionqueue = createHARegionQueue("testing"); - Conflatable[] cf = new Conflatable[10]; - // put 10 conflatable objects - for (int i = 0; i < 10; i++) { - cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true, - "testing"); - regionqueue.put(cf[i]); - } - // remove the first 5 by giving the right sequence id - regionqueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4)); - // verify 1-5 not in region - for (long i = 1; i < 6; i++) { - assertTrue(!regionqueue.getRegion().containsKey(new Long(i))); - } - // verify 6-10 still in region queue - for (long i = 6; i < 11; i++) { - assertTrue(regionqueue.getRegion().containsKey(new Long(i))); - } - // verify 1-5 not in conflation map - for (long i = 0; i < 5; i++) { - assertTrue(!((Map) regionqueue.getConflationMapForTesting().get("testing")) - .containsKey("key" + i)); - } - // verify 6-10 in conflation map - for (long i = 5; i < 10; i++) { - assertTrue( - ((Map) regionqueue.getConflationMapForTesting().get("testing")).containsKey("key" + i)); - } + public void testQRMDispatch() throws Exception { + HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName()); + Conflatable[] cf = new Conflatable[10]; + + // put 10 conflatable objects + for (int i = 0; i < 10; i++) { + cf[i] = new ConflatableObject("key" + i, "value", new EventID(new byte[] {1}, 1, i), true, + this.testName.getMethodName()); + regionQueue.put(cf[i]); + } - EventID eid = new EventID(new byte[] {1}, 1, 6); - // verify 1-5 not in eventMap - for (long i = 1; i < 6; i++) { - assertTrue(!regionqueue.getCurrentCounterSet(eid).contains(new Long(i))); - } - // verify 6-10 in event Map - for (long i = 6; i < 11; i++) { - assertTrue(regionqueue.getCurrentCounterSet(eid).contains(new Long(i))); - } + // remove the first 5 by giving the right sequence id + regionQueue.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 4)); - // verify 1-5 not in available Id's map - for (long i = 1; i < 6; i++) { - assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(i))); - } + // verify 1-5 not in region + for (int i = 1; i < 6; i++) { + assertThat(!regionQueue.getRegion().containsKey((long) i), is(true)); + } - // verify 6-10 in available id's map - for (long i = 6; i < 11; i++) { - assertTrue(regionqueue.getAvalaibleIds().contains(new Long(i))); - } - } catch (Exception e) { - throw new AssertionError("Exception occurred in test due to ", e); + // verify 6-10 still in region queue + for (int i = 6; i < 11; i++) { + assertThat(regionQueue.getRegion().containsKey((long) i), is(true)); + } + + // verify 1-5 not in conflation map + for (int i = 0; i < 5; i++) { + assertThat( + !((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())) + .containsKey("key" + i), + is(true)); + } + + // verify 6-10 in conflation map + for (int i = 5; i < 10; i++) { + assertThat(((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())) + .containsKey("key" + i), is(true)); + } + + EventID eid = new EventID(new byte[] {1}, 1, 6); + + // verify 1-5 not in eventMap + for (int i = 1; i < 6; i++) { + assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true)); + } + + // verify 6-10 in event Map + for (int i = 6; i < 11; i++) { + assertThat(regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true)); + } + + // verify 1-5 not in available Id's map + for (int i = 1; i < 6; i++) { + assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true)); + } + + // verify 6-10 in available id's map + for (int i = 6; i < 11; i++) { + assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true)); } } @@ -652,68 +454,74 @@ public class HARegionQueueJUnitTest { * 1-7 not there - verify data for 8-10 is there */ @Test - public void testQRMBeforePut() { - try { - HARegionQueue regionqueue = createHARegionQueue("testing"); + public void testQRMBeforePut() throws Exception { + HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName()); - EventID[] ids = new EventID[10]; + EventID[] ids = new EventID[10]; - for (int i = 0; i < 10; i++) { - ids[i] = new EventID(new byte[] {1}, 1, i); - } + for (int i = 0; i < 10; i++) { + ids[i] = new EventID(new byte[] {1}, 1, i); + } - // first get the qrm message for the seventh id - regionqueue.removeDispatchedEvents(ids[6]); - Conflatable[] cf = new Conflatable[10]; - // put 10 conflatable objects - for (int i = 0; i < 10; i++) { - cf[i] = new ConflatableObject("key" + i, "value", ids[i], true, "testing"); - regionqueue.put(cf[i]); - } + // first get the qrm message for the seventh id + regionQueue.removeDispatchedEvents(ids[6]); + Conflatable[] cf = new Conflatable[10]; - // verify 1-7 not in region - Set values = (Set) regionqueue.getRegion().values(); - for (int i = 0; i < 7; i++) { - System.out.println(i); - assertTrue(!values.contains(cf[i])); - } - // verify 8-10 still in region queue - for (int i = 7; i < 10; i++) { - System.out.println(i); - assertTrue(values.contains(cf[i])); - } - // verify 1-8 not in conflation map - for (long i = 0; i < 7; i++) { - assertTrue(!((Map) regionqueue.getConflationMapForTesting().get("testing")) - .containsKey("key" + i)); - } - // verify 8-10 in conflation map - for (long i = 7; i < 10; i++) { - assertTrue( - ((Map) regionqueue.getConflationMapForTesting().get("testing")).containsKey("key" + i)); - } + // put 10 conflatable objects + for (int i = 0; i < 10; i++) { + cf[i] = + new ConflatableObject("key" + i, "value", ids[i], true, this.testName.getMethodName()); + regionQueue.put(cf[i]); + } - EventID eid = new EventID(new byte[] {1}, 1, 6); - // verify 1-7 not in eventMap - for (long i = 4; i < 11; i++) { - assertTrue(!regionqueue.getCurrentCounterSet(eid).contains(new Long(i))); - } - // verify 8-10 in event Map - for (long i = 1; i < 4; i++) { - assertTrue(regionqueue.getCurrentCounterSet(eid).contains(new Long(i))); - } + // verify 1-7 not in region + Set values = (Set) regionQueue.getRegion().values(); - // verify 1-7 not in available Id's map - for (long i = 4; i < 11; i++) { - assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(i))); - } + for (int i = 0; i < 7; i++) { + System.out.println(i); + assertThat(!values.contains(cf[i]), is(true)); + } - // verify 8-10 in available id's map - for (long i = 1; i < 4; i++) { - assertTrue(regionqueue.getAvalaibleIds().contains(new Long(i))); - } - } catch (Exception e) { - throw new AssertionError("Exception occurred in test due to ", e); + // verify 8-10 still in region queue + for (int i = 7; i < 10; i++) { + System.out.println(i); + assertThat(values.contains(cf[i]), is(true)); + } + + // verify 1-8 not in conflation map + for (int i = 0; i < 7; i++) { + assertThat( + !((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())) + .containsKey("key" + i), + is(true)); + } + + // verify 8-10 in conflation map + for (int i = 7; i < 10; i++) { + assertThat(((Map) regionQueue.getConflationMapForTesting().get(this.testName.getMethodName())) + .containsKey("key" + i), is(true)); + } + + EventID eid = new EventID(new byte[] {1}, 1, 6); + + // verify 1-7 not in eventMap + for (int i = 4; i < 11; i++) { + assertThat(!regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true)); + } + + // verify 8-10 in event Map + for (int i = 1; i < 4; i++) { + assertThat(regionQueue.getCurrentCounterSet(eid).contains((long) i), is(true)); + } + + // verify 1-7 not in available Id's map + for (int i = 4; i < 11; i++) { + assertThat(!regionQueue.getAvalaibleIds().contains((long) i), is(true)); + } + + // verify 8-10 in available id's map + for (int i = 1; i < 4; i++) { + assertThat(regionQueue.getAvalaibleIds().contains((long) i), is(true)); } } @@ -721,33 +529,33 @@ public class HARegionQueueJUnitTest { * test to verify conflation happens as expected */ @Test - public void testConflation() { - try { - HARegionQueue regionqueue = createHARegionQueue("testing"); - EventID ev1 = new EventID(new byte[] {1}, 1, 1); - EventID ev2 = new EventID(new byte[] {1}, 2, 2); - Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing"); - Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, "testing"); - regionqueue.put(cf1); - Map conflationMap = regionqueue.getConflationMapForTesting(); - assertTrue(((Map) (conflationMap.get("testing"))).get("key").equals(new Long(1))); - regionqueue.put(cf2); - // verify the conflation map has recorded the new key - assertTrue(((Map) (conflationMap.get("testing"))).get("key").equals(new Long(2))); - // the old key should not be present - assertTrue(!regionqueue.getRegion().containsKey(new Long(1))); - // available ids should not contain the old id (the old position) - assertTrue(!regionqueue.getAvalaibleIds().contains(new Long(1))); - // available id should have the new id (the new position) - assertTrue(regionqueue.getAvalaibleIds().contains(new Long(2))); - // events map should not contain the old position - assertTrue(regionqueue.getCurrentCounterSet(ev1).isEmpty()); - // events map should contain the new position - assertTrue(regionqueue.getCurrentCounterSet(ev2).contains(new Long(2))); - - } catch (Exception e) { - throw new AssertionError("Exception occurred in test due to ", e); - } + public void testConflation() throws Exception { + HARegionQueue regionQueue = createHARegionQueue(this.testName.getMethodName()); + EventID ev1 = new EventID(new byte[] {1}, 1, 1); + EventID ev2 = new EventID(new byte[] {1}, 2, 2); + Conflatable cf1 = + new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName()); + Conflatable cf2 = + new ConflatableObject("key", "value2", ev2, true, this.testName.getMethodName()); + regionQueue.put(cf1); + + Map conflationMap = regionQueue.getConflationMapForTesting(); + assertThat(((Map) conflationMap.get(this.testName.getMethodName())).get("key"), is(1L)); + + regionQueue.put(cf2); + + // verify the conflation map has recorded the new key + assertThat(((Map) conflationMap.get(this.testName.getMethodName())).get("key"), is(2L)); + // the old key should not be present + assertThat(!regionQueue.getRegion().containsKey(1L), is(true)); + // available ids should not contain the old id (the old position) + assertThat(!regionQueue.getAvalaibleIds().contains(1L), is(true)); + // available id should have the new id (the new position) + assertThat(regionQueue.getAvalaibleIds().contains(2L), is(true)); + // events map should not contain the old position + assertThat(regionQueue.getCurrentCounterSet(ev1).isEmpty(), is(true)); + // events map should contain the new position + assertThat(regionQueue.getCurrentCounterSet(ev2).contains(2L), is(true)); } /** @@ -755,97 +563,58 @@ public class HARegionQueueJUnitTest { * events which are of ID greater than that contained in QRM should stay */ @Test - public void testQRM() { - try { - RegionQueue regionqueue = createHARegionQueue("testing"); - for (int i = 0; i < 10; ++i) { - regionqueue.put(new ConflatableObject("key" + (i + 1), "value", - new EventID(new byte[] {1}, 1, i + 1), true, "testing")); - } - EventID qrmID = new EventID(new byte[] {1}, 1, 5); - ((HARegionQueue) regionqueue).removeDispatchedEvents(qrmID); - Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting(); - assertTrue(((Map) (conflationMap.get("testing"))).size() == 5); - - Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds(); - Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID); - assertTrue(availableIDs.size() == 5); - assertTrue(counters.size() == 5); - for (int i = 5; i < 10; ++i) { - assertTrue(((Map) (conflationMap.get("testing"))).containsKey("key" + (i + 1))); - assertTrue(availableIDs.contains(new Long((i + 1)))); - assertTrue(counters.contains(new Long((i + 1)))); - } - Region rgn = ((HARegionQueue) regionqueue).getRegion(); - assertTrue(rgn.keySet().size() == 6); + public void testQRM() throws Exception { + RegionQueue regionqueue = createHARegionQueue(this.testName.getMethodName()); - } catch (Exception e) { - throw new AssertionError("Exception occurred in test due to ", e); + for (int i = 0; i < 10; ++i) { + regionqueue.put(new ConflatableObject("key" + (i + 1), "value", + new EventID(new byte[] {1}, 1, i + 1), true, this.testName.getMethodName())); } - } - protected static HARegionQueue hrqFortestSafeConflationRemoval; + EventID qrmID = new EventID(new byte[] {1}, 1, 5); + ((HARegionQueue) regionqueue).removeDispatchedEvents(qrmID); + Map conflationMap = ((HARegionQueue) regionqueue).getConflationMapForTesting(); + assertThat(((Map) conflationMap.get(this.testName.getMethodName())).size(), is(5)); - /** - * This test tests safe removal from the conflation map. i.e operations should only remove old - * values and not the latest value - */ - @Test - public void testSafeConflationRemoval() { - try { - hrqFortestSafeConflationRemoval = new HARQTestClass("testSafeConflationRemoval", + Set availableIDs = ((HARegionQueue) regionqueue).getAvalaibleIds(); + Set counters = ((HARegionQueue) regionqueue).getCurrentCounterSet(qrmID); - cache, this); - Conflatable cf1 = new ConflatableObject("key1", "value", new EventID(new byte[] {1}, 1, 1), - true, "testSafeConflationRemoval"); - hrqFortestSafeConflationRemoval.put(cf1); - hrqFortestSafeConflationRemoval.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 1)); - Map map = - - (Map) hrqFortestSafeConflationRemoval.getConflationMapForTesting() - .get("testSafeConflationRemoval"); - assertTrue( - "Expected the counter to be 2 since it should not have been deleted but it is not so ", - map.get("key1").equals(new Long(2))); - hrqFortestSafeConflationRemoval = null; - } catch (Exception e) { - throw new AssertionError("Test failed due to ", e); + assertThat(availableIDs.size(), is(5)); + assertThat(counters.size(), is(5)); + + for (int i = 5; i < 10; ++i) { + assertThat( + ((Map) (conflationMap.get(this.testName.getMethodName()))).containsKey("key" + (i + 1)), + is(true)); + assertThat(availableIDs.contains((long) (i + 1)), is(true)); + assertThat(counters.contains((long) (i + 1)), is(true)); } + + Region rgn = ((HARegionQueue) regionqueue).getRegion(); + assertThat(rgn.keySet().size(), is(6)); } /** - * Extends HARegionQueue for testing purposes. used by testSafeConflationRemoval + * This test tests safe removal from the conflation map. i.e operations should only remove old + * values and not the latest value */ - static class HARQTestClass extends HARegionQueue.TestOnlyHARegionQueue { + @Test + public void testSafeConflationRemoval() throws Exception { + hrqForTestSafeConflationRemoval = new HARQTestClass("testSafeConflationRemoval", this.cache); + Conflatable cf1 = new ConflatableObject("key1", "value", new EventID(new byte[] {1}, 1, 1), + true, "testSafeConflationRemoval"); - public HARQTestClass(String REGION_NAME, InternalCache cache, HARegionQueueJUnitTest test) - throws IOException, ClassNotFoundException, CacheException, InterruptedException { - super(REGION_NAME, cache); - } + hrqForTestSafeConflationRemoval.put(cf1); + hrqForTestSafeConflationRemoval.removeDispatchedEvents(new EventID(new byte[] {1}, 1, 1)); - ConcurrentMap createConcurrentMap() { - return new ConcHashMap(); - } - } + Map map = (Map) hrqForTestSafeConflationRemoval.getConflationMapForTesting() + .get("testSafeConflationRemoval"); - /** - * Used to override the remove method for testSafeConflationRemoval - */ - static class ConcHashMap extends ConcurrentHashMap implements ConcurrentMap { - public boolean remove(Object arg0, Object arg1) { - Conflatable cf2 = new ConflatableObject("key1", "value2", new EventID(new byte[] {1}, 1, 2), - true, "testSafeConflationRemoval"); - try { - hrqFortestSafeConflationRemoval.put(cf2); - } catch (Exception e) { - throw new AssertionError("Exception occurred in trying to put ", e); - } - return super.remove(arg0, arg1); - } + assertThat( + "Expected the counter to be 2 since it should not have been deleted but it is not so ", + map.get("key1"), is(2L)); } - static List list1; - /** * This test tests remove operation is causing the insertion of sequence ID for existing * ThreadIdentifier object and concurrently the QRM thread is iterating over the Map to form the @@ -864,80 +633,86 @@ public class HARegionQueueJUnitTest { * It is then verified to see that all the sequence should be greater than x */ @Test - public void testConcurrentDispatcherAndRemovalForSameRegionSameThreadId() { - try { - final long numberOfIterations = 1000; - final HARegionQueue hrq = createHARegionQueue("testConcurrentDispatcherAndRemoval"); - HARegionQueue.stopQRMThread(); - final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations]; - for (int i = 0; i < numberOfIterations; i++) { - ids[i] = new ThreadIdentifier(new byte[] {1}, i); - hrq.addDispatchedMessage(ids[i], i); - } - Thread thread1 = new Thread() { - public void run() { - try { - Thread.sleep(600); - } catch (InterruptedException e) { - fail("interrupted"); - } - list1 = HARegionQueue.createMessageListForTesting(); - }; - }; - Thread thread2 = new Thread() { - public void run() { - try { - Thread.sleep(480); - } catch (InterruptedException e) { - fail("interrupted"); - } - for (int i = 0; i < numberOfIterations; i++) { - hrq.addDispatchedMessage(ids[i], i + numberOfIterations); - } - }; - }; - thread1.start(); - thread2.start(); - ThreadUtils.join(thread1, 30 * 1000); - ThreadUtils.join(thread2, 30 * 1000); - List list2 = HARegionQueue.createMessageListForTesting(); - Iterator iterator = list1.iterator(); - boolean doOnce = false; - EventID id = null; - Map map = new HashMap(); - while (iterator.hasNext()) { - if (!doOnce) { - iterator.next(); - iterator.next(); - doOnce = true; - } else { - id = (EventID) iterator.next(); - map.put(new Long(id.getThreadID()), new Long(id.getSequenceID())); + public void testConcurrentDispatcherAndRemovalForSameRegionSameThreadId() throws Exception { + long numberOfIterations = 1000; + HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName()); + HARegionQueue.stopQRMThread(); + ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations]; + + for (int i = 0; i < numberOfIterations; i++) { + ids[i] = new ThreadIdentifier(new byte[] {1}, i); + hrq.addDispatchedMessage(ids[i], i); + } + + Thread thread1 = new Thread() { + @Override + public void run() { + try { + Thread.sleep(600); + } catch (InterruptedException e) { + errorCollector.addError(e); } + list1 = HARegionQueue.createMessageListForTesting(); } - iterator = list2.iterator(); - doOnce = false; - id = null; - while (iterator.hasNext()) { - if (!doOnce) { - iterator.next(); - iterator.next(); - doOnce = true; - } else { - id = (EventID) iterator.next(); - map.put(new Long(id.getThreadID()), new Long(id.getSequenceID())); + }; + + Thread thread2 = new Thread() { + @Override + public void run() { + try { + Thread.sleep(480); + } catch (InterruptedException e) { + errorCollector.addError(e); + } + for (int i = 0; i < numberOfIterations; i++) { + hrq.addDispatchedMessage(ids[i], i + numberOfIterations); } } - iterator = map.values().iterator(); - Long max = new Long(numberOfIterations); - Long next; - while (iterator.hasNext()) { - next = ((Long) iterator.next()); - assertTrue(" Expected all the sequence ID's to be greater than " + max - + " but it is not so. Got sequence id " + next, next.compareTo(max) >= 0); + }; + + thread1.start(); + thread2.start(); + ThreadUtils.join(thread1, 30 * 1000); + ThreadUtils.join(thread2, 30 * 1000); + List list2 = HARegionQueue.createMessageListForTesting(); + Iterator iterator = list1.iterator(); + boolean doOnce = false; + EventID id; + Map map = new HashMap(); + + while (iterator.hasNext()) { + if (!doOnce) { + iterator.next(); + iterator.next(); + doOnce = true; + } else { + id = (EventID) iterator.next(); + map.put(new Long(id.getThreadID()), id.getSequenceID()); + } + } + + iterator = list2.iterator(); + doOnce = false; + + while (iterator.hasNext()) { + if (!doOnce) { + iterator.next(); + iterator.next(); + doOnce = true; + } else { + id = (EventID) iterator.next(); + map.put(id.getThreadID(), id.getSequenceID()); } - } catch (Exception e) { - throw new AssertionError("Test failed due to : ", e); + } + + iterator = map.values().iterator(); + Long max = numberOfIterations; + while (iterator.hasNext()) { + Long next = (Long) iterator.next(); + assertThat( + " Expected all the sequence ID's to be greater than " + max + + " but it is not so. Got sequence id " + next, + next.compareTo(max), greaterThanOrEqualTo(0)); } } @@ -958,77 +733,81 @@ public class HARegionQueueJUnitTest { * It is then verified to see that the map size should be 2x */ @Test - public void testConcurrentDispatcherAndRemovalForSameRegionDifferentThreadId() { - try { - final long numberOfIterations = 1000; - final HARegionQueue hrq = createHARegionQueue("testConcurrentDispatcherAndRemoval"); - HARegionQueue.stopQRMThread(); - final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations]; - for (int i = 0; i < numberOfIterations; i++) { - ids[i] = new ThreadIdentifier(new byte[] {1}, i); - hrq.addDispatchedMessage(ids[i], i); - } - Thread thread1 = new Thread() { - public void run() { - try { - Thread.sleep(600); - } catch (InterruptedException e) { - fail("interrupted"); - } - list1 = HARegionQueue.createMessageListForTesting(); - }; - }; - Thread thread2 = new Thread() { - public void run() { - try { - Thread.sleep(480); - } catch (InterruptedException e) { - fail("interrupted"); - } - for (int i = 0; i < numberOfIterations; i++) { - ids[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations); - hrq.addDispatchedMessage(ids[i], i + numberOfIterations); - } - }; - }; - thread1.start(); - thread2.start(); - ThreadUtils.join(thread1, 30 * 1000); - ThreadUtils.join(thread2, 30 * 1000); - List list2 = HARegionQueue.createMessageListForTesting(); - Iterator iterator = list1.iterator(); - boolean doOnce = false; - EventID id = null; - Map map = new HashMap(); - while (iterator.hasNext()) { - if (!doOnce) { - iterator.next(); - iterator.next(); - doOnce = true; - } else { - id = (EventID) iterator.next(); - map.put(new Long(id.getThreadID()), new Long(id.getSequenceID())); + public void testConcurrentDispatcherAndRemovalForSameRegionDifferentThreadId() throws Exception { + int numberOfIterations = 1000; + HARegionQueue hrq = createHARegionQueue(this.testName.getMethodName()); + HARegionQueue.stopQRMThread(); + ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations]; + + for (int i = 0; i < numberOfIterations; i++) { + ids[i] = new ThreadIdentifier(new byte[] {1}, i); + hrq.addDispatchedMessage(ids[i], i); + } + + Thread thread1 = new Thread() { + @Override + public void run() { + try { + Thread.sleep(600); + } catch (InterruptedException e) { + errorCollector.addError(e); } + list1 = HARegionQueue.createMessageListForTesting(); } - iterator = list2.iterator(); - doOnce = false; - id = null; - while (iterator.hasNext()) { - if (!doOnce) { - iterator.next(); - iterator.next(); - doOnce = true; - } else { - id = (EventID) iterator.next(); - map.put(new Long(id.getThreadID()), new Long(id.getSequenceID())); + }; + + Thread thread2 = new Thread() { + @Override + public void run() { + try { + Thread.sleep(480); + } catch (InterruptedException e) { + errorCollector.addError(e); + } + for (int i = 0; i < numberOfIterations; i++) { + ids[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations); + hrq.addDispatchedMessage(ids[i], i + numberOfIterations); } } - assertTrue( - " Expected the map size to be " + (2 * numberOfIterations) + " but it is " + map.size(), - map.size() == (2 * numberOfIterations)); - } catch (Exception e) { - throw new AssertionError("Test failed due to an unexpected exception : ", e); + }; + + thread1.start(); + thread2.start(); + ThreadUtils.join(thread1, 30 * 1000); + ThreadUtils.join(thread2, 30 * 1000); + List list2 = HARegionQueue.createMessageListForTesting(); + Iterator iterator = list1.iterator(); + boolean doOnce = false; + EventID id; + Map map = new HashMap(); + + while (iterator.hasNext()) { + if (!doOnce) { + iterator.next(); + iterator.next(); + doOnce = true; + } else { + id = (EventID) iterator.next(); + map.put(id.getThreadID(), id.getSequenceID()); + } + } + + iterator = list2.iterator(); + doOnce = false; + + while (iterator.hasNext()) { + if (!doOnce) { + iterator.next(); + iterator.next(); + doOnce = true; + } else { + id = (EventID) iterator.next(); + map.put(id.getThreadID(), id.getSequenceID()); + } } + assertThat( + " Expected the map size to be " + 2 * numberOfIterations + " but it is " + map.size(), + map.size(), is(2 * numberOfIterations)); } /** @@ -1050,101 +829,96 @@ public class HARegionQueueJUnitTest { * It is then verified to see that a total of x entries are present in the map */ @Test - public void testConcurrentDispatcherAndRemovalForMultipleRegionsSameThreadId() { - try { - final long numberOfIterations = 10000; - final HARegionQueue hrq1 = createHARegionQueue("testConcurrentDispatcherAndRemoval1"); - - final HARegionQueue hrq2 = createHARegionQueue("testConcurrentDispatcherAndRemoval2"); + public void testConcurrentDispatcherAndRemovalForMultipleRegionsSameThreadId() throws Exception { + int numberOfIterations = 10000; + HARegionQueue hrq1 = createHARegionQueue(this.testName.getMethodName() + "-1"); + HARegionQueue hrq2 = createHARegionQueue(this.testName.getMethodName() + "-2"); + HARegionQueue hrq3 = createHARegionQueue(this.testName.getMethodName() + "-3"); + HARegionQueue hrq4 = createHARegionQueue(this.testName.getMethodName() + "-4"); + HARegionQueue hrq5 = createHARegionQueue(this.testName.getMethodName() + "-5"); - final HARegionQueue hrq3 = createHARegionQueue("testConcurrentDispatcherAndRemoval3"); + HARegionQueue.stopQRMThread(); - final HARegionQueue hrq4 = createHARegionQueue("testConcurrentDispatcherAndRemoval4"); + ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations]; - final HARegionQueue hrq5 = createHARegionQueue("testConcurrentDispatcherAndRemoval5"); + for (int i = 0; i < numberOfIterations; i++) { + ids[i] = new ThreadIdentifier(new byte[] {1}, i); + hrq1.addDispatchedMessage(ids[i], i); + hrq2.addDispatchedMessage(ids[i], i); - HARegionQueue.stopQRMThread(); - final ThreadIdentifier[] ids = new ThreadIdentifier[(int) numberOfIterations]; - - for (int i = 0; i < numberOfIterations; i++) { - ids[i] = new ThreadIdentifier(new byte[] {1}, i); - hrq1.addDispatchedMessage(ids[i], i); - hrq2.addDispatchedMessage(ids[i], i); + } + Thread thread1 = new Thread() { + @Override + public void run() { + try { + Thread.sleep(600); + } catch (InterruptedException e) { + errorCollector.addError(e); + } + list1 = HARegionQueue.createMessageListForTesting(); } + }; - Thread thread1 = new Thread() { - public void run() { - try { - Thread.sleep(600); - } catch (InterruptedException e) { - fail("interrupted"); - } - list1 = HARegionQueue.createMessageListForTesting(); - }; - }; - Thread thread2 = new Thread() { - public void run() { - try { - Thread.sleep(480); - } catch (InterruptedException e) { - fail("interrupted"); - } - for (int i = 0; i < numberOfIterations; i++) { - hrq3.addDispatchedMessage(ids[i], i); - hrq4.addDispatchedMessage(ids[i], i); - hrq5.addDispatchedMessage(ids[i], i); - } - }; - }; - thread1.start(); - thread2.start(); - ThreadUtils.join(thread1, 30 * 1000); - ThreadUtils.join(thread2, 30 * 1000); - List list2 = HARegionQueue.createMessageListForTesting(); - Iterator iterator = list1.iterator(); - boolean doOnce = true; - EventID id = null; - Map map = new HashMap(); - while (iterator.hasNext()) { - if (!doOnce) { - iterator.next(); // read the total message size - doOnce = true; - } else { - iterator.next();// region name; - int size = ((Integer) iterator.next()).intValue(); - for (int i = 0; i < size; i++) { - id = (EventID) iterator.next(); - map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), - new Long(id.getSequenceID())); - } + Thread thread2 = new Thread() { + @Override + public void run() { + try { + Thread.sleep(480); + } catch (InterruptedException e) { + errorCollector.addError(e); + } + for (int i = 0; i < numberOfIterations; i++) { + hrq3.addDispatchedMessage(ids[i], i); + hrq4.addDispatchedMessage(ids[i], i); + hrq5.addDispatchedMessage(ids[i], i); } } - - iterator = list2.iterator(); - doOnce = true; - id = null; - while (iterator.hasNext()) { - if (!doOnce) { - iterator.next(); // read the total message size - doOnce = true; - } else { - iterator.next();// region name; - int size = ((Integer) iterator.next()).intValue(); - for (int i = 0; i < size; i++) { - id = (EventID) iterator.next(); - map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), - new Long(id.getSequenceID())); - } + }; + + thread1.start(); + thread2.start(); + ThreadUtils.join(thread1, 30 * 1000); + ThreadUtils.join(thread2, 30 * 1000); + List list2 = HARegionQueue.createMessageListForTesting(); + Iterator iterator = list1.iterator(); + boolean doOnce = true; + EventID id; + Map map = new HashMap(); + + while (iterator.hasNext()) { + if (!doOnce) { + iterator.next(); // read the total message size + doOnce = true; + } else { + iterator.next();// region name; + int size = (Integer) iterator.next(); + for (int i = 0; i < size; i++) { + id = (EventID) iterator.next(); + map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID()); } } - assertTrue( - " Expected the map size to be " + (numberOfIterations) + " but it is " + map.size(), - map.size() == (numberOfIterations)); + } - } catch (Exception e) { - throw new AssertionError("Test failed due to : ", e); + iterator = list2.iterator(); + doOnce = true; + + while (iterator.hasNext()) { + if (!doOnce) { + iterator.next(); // read the total message size + doOnce = true; + } else { + iterator.next();// region name; + int size = (Integer) iterator.next(); + for (int i = 0; i < size; i++) { + id = (EventID) iterator.next(); + map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID()); + } + } } + + assertThat(" Expected the map size to be " + numberOfIterations + " but it is " + map.size(), + map.size(), is(numberOfIterations)); } /** @@ -1168,203 +942,179 @@ public class HARegionQueueJUnitTest { * It is then verified to see that the map size should be 2x * number of regions */ @Test - public void testConcurrentDispatcherAndRemovalForMultipleRegionsDifferentThreadId() { - try { - final long numberOfIterations = 1000; - final HARegionQueue hrq1 = - - createHARegionQueue("testConcurrentDispatcherAndRemoval1"); - - final HARegionQueue hrq2 = - - createHARegionQueue("testConcurrentDispatcherAndRemoval2"); - final HARegionQueue hrq3 = - - createHARegionQueue("testConcurrentDispatcherAndRemoval3"); - final HARegionQueue hrq4 = - - createHARegionQueue("testConcurrentDispatcherAndRemoval4"); - final HARegionQueue hrq5 = - - createHARegionQueue("testConcurrentDispatcherAndRemoval5"); - - HARegionQueue.stopQRMThread(); - - final ThreadIdentifier[] ids1 = new ThreadIdentifier[(int) numberOfIterations]; - final ThreadIdentifier[] ids2 = new ThreadIdentifier[(int) numberOfIterations]; - final ThreadIdentifier[] ids3 = new ThreadIdentifier[(int) numberOfIterations]; - final ThreadIdentifier[] ids4 = new ThreadIdentifier[(int) numberOfIterations]; - final ThreadIdentifier[] ids5 = new ThreadIdentifier[(int) numberOfIterations]; + public void testConcurrentDispatcherAndRemovalForMultipleRegionsDifferentThreadId() + throws Exception { + int numberOfIterations = 1000; + HARegionQueue hrq1 = createHARegionQueue(this.testName.getMethodName() + "-1"); + HARegionQueue hrq2 = createHARegionQueue(this.testName.getMethodName() + "-2"); + HARegionQueue hrq3 = createHARegionQueue(this.testName.getMethodName() + "-3"); + HARegionQueue hrq4 = createHARegionQueue(this.testName.getMethodName() + "-4"); + HARegionQueue hrq5 = createHARegionQueue(this.testName.getMethodName() + "-5"); + + HARegionQueue.stopQRMThread(); + + ThreadIdentifier[] ids1 = new ThreadIdentifier[(int) numberOfIterations]; + ThreadIdentifier[] ids2 = new ThreadIdentifier[(int) numberOfIterations]; + ThreadIdentifier[] ids3 = new ThreadIdentifier[(int) numberOfIterations]; + ThreadIdentifier[] ids4 = new ThreadIdentifier[(int) numberOfIterations]; + ThreadIdentifier[] ids5 = new ThreadIdentifier[(int) numberOfIterations]; + + for (int i = 0; i < numberOfIterations; i++) { + ids1[i] = new ThreadIdentifier(new byte[] {1}, i); + ids2[i] = new ThreadIdentifier(new byte[] {2}, i); + ids3[i] = new ThreadIdentifier(new byte[] {3}, i); + ids4[i] = new ThreadIdentifier(new byte[] {4}, i); + ids5[i] = new ThreadIdentifier(new byte[] {5}, i); + hrq1.addDispatchedMessage(ids1[i], i); + hrq2.addDispatchedMessage(ids2[i], i); + hrq3.addDispatchedMessage(ids3[i], i); + hrq4.addDispatchedMessage(ids4[i], i); + hrq5.addDispatchedMessage(ids5[i], i); + } - for (int i = 0; i < numberOfIterations; i++) { - ids1[i] = new ThreadIdentifier(new byte[] {1}, i); - ids2[i] = new ThreadIdentifier(new byte[] {2}, i); - ids3[i] = new ThreadIdentifier(new byte[] {3}, i); - ids4[i] = new ThreadIdentifier(new byte[] {4}, i); - ids5[i] = new ThreadIdentifier(new byte[] {5}, i); - hrq1.addDispatchedMessage(ids1[i], i); - hrq2.addDispatchedMessage(ids2[i], i); - hrq3.addDispatchedMessage(ids3[i], i); - hrq4.addDispatchedMessage(ids4[i], i); - hrq5.addDispatchedMessage(ids5[i], i); + Thread thread1 = new Thread() { + @Override + public void run() { + try { + Thread.sleep(600); + } catch (InterruptedException e) { + errorCollector.addError(e); + } + list1 = HARegionQueue.createMessageListForTesting(); } + }; - Thread thread1 = new Thread() { - public void run() { - try { - Thread.sleep(600); - } catch (InterruptedException e) { - fail("interrupted"); - } - list1 = HARegionQueue.createMessageListForTesting(); - }; - }; - Thread thread2 = new Thread() { - public void run() { - try { - Thread.sleep(480); - } catch (InterruptedException e) { - fail("Interrupted"); - } - for (int i = 0; i < numberOfIterations; i++) { - ids1[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations); - ids2[i] = new ThreadIdentifier(new byte[] {2}, i + numberOfIterations); - ids3[i] = new ThreadIdentifier(new byte[] {3}, i + numberOfIterations); - ids4[i] = new ThreadIdentifier(new byte[] {4}, i + numberOfIterations); - ids5[i] = new ThreadIdentifier(new byte[] {5}, i + numberOfIterations); - - hrq1.addDispatchedMessage(ids1[i], i + numberOfIterations); - hrq2.addDispatchedMessage(ids2[i], i + numberOfIterations); - hrq3.addDispatchedMessage(ids3[i], i + numberOfIterations); - hrq4.addDispatchedMessage(ids4[i], i + numberOfIterations); - hrq5.addDispatchedMessage(ids5[i], i + numberOfIterations); - } - }; - }; - thread1.start(); - thread2.start(); - ThreadUtils.join(thread1, 30 * 1000); - ThreadUtils.join(thread2, 30 * 1000); - List list2 = HARegionQueue.createMessageListForTesting(); - Iterator iterator = list1.iterator(); - boolean doOnce = true; - EventID id = null; - Map map = new HashMap(); - while (iterator.hasNext()) { - if (!doOnce) { - iterator.next(); // read the total message size - doOnce = true; - } else { - iterator.next();// region name; - int size = ((Integer) iterator.next()).intValue(); - System.out.println(" size of list 1 iteration x " + size); - for (int i = 0; i < size; i++) { - - id = (EventID) iterator.next(); - map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), - new Long(id.getSequenceID())); - } + Thread thread2 = new Thread() { + @Override + public void run() { + try { + Thread.sleep(480); + } catch (InterruptedException e) { + errorCollector.addError(e); + } + for (int i = 0; i < numberOfIterations; i++) { + ids1[i] = new ThreadIdentifier(new byte[] {1}, i + numberOfIterations); + ids2[i] = new ThreadIdentifier(new byte[] {2}, i + numberOfIterations); + ids3[i] = new ThreadIdentifier(new byte[] {3}, i + numberOfIterations); + ids4[i] = new ThreadIdentifier(new byte[] {4}, i + numberOfIterations); + ids5[i] = new ThreadIdentifier(new byte[] {5}, i + numberOfIterations); + + hrq1.addDispatchedMessage(ids1[i], i + numberOfIterations); + hrq2.addDispatchedMessage(ids2[i], i + numberOfIterations); + hrq3.addDispatchedMessage(ids3[i], i + numberOfIterations); + hrq4.addDispatchedMessage(ids4[i], i + numberOfIterations); + hrq5.addDispatchedMessage(ids5[i], i + numberOfIterations); } } - - iterator = list2.iterator(); - doOnce = true; - id = null; - while (iterator.hasNext()) { - if (!doOnce) { - iterator.next(); // read the total message size - doOnce = true; - } else { - iterator.next();// region name; - int size = ((Integer) iterator.next()).intValue(); - System.out.println(" size of list 2 iteration x " + size); - for (int i = 0; i < size; i++) { - id = (EventID) iterator.next(); - map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), - new Long(id.getSequenceID())); - } + }; + + thread1.start(); + thread2.start(); + ThreadUtils.join(thread1, 30 * 1000); + ThreadUtils.join(thread2, 30 * 1000); + List list2 = HARegionQueue.createMessageListForTesting(); + Iterator iterator = list1.iterator(); + boolean doOnce = true; + EventID id = null; + Map map = new HashMap(); + + while (iterator.hasNext()) { + if (!doOnce) { + iterator.next(); // read the total message size + doOnce = true; + } else { + iterator.next(); // region name; + int size = (Integer) iterator.next(); + System.out.println(" size of list 1 iteration x " + size); + for (int i = 0; i < size; i++) { + id = (EventID) iterator.next(); + map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID()); } } + } - assertTrue(" Expected the map size to be " + (numberOfIterations * 2 * 5) + " but it is " - + map.size(), map.size() == (numberOfIterations * 2 * 5)); + iterator = list2.iterator(); + doOnce = true; - } catch (Exception e) { - throw new AssertionError("Test failed due to : ", e); + while (iterator.hasNext()) { + if (!doOnce) { + iterator.next(); // read the total message size + doOnce = true; + } else { + iterator.next(); // region name; + int size = (Integer) iterator.next(); + System.out.println(" size of list 2 iteration x " + size); + for (int i = 0; i < size; i++) { + id = (EventID) iterator.next(); + map.put(new ThreadIdentifier(id.getMembershipID(), id.getThreadID()), id.getSequenceID()); + } + } } + + assertThat( + " Expected the map size to be " + numberOfIterations * 2 * 5 + " but it is " + map.size(), + map.size(), is(numberOfIterations * 2 * 5)); } /** - * Concurrent Peek on Blokcing Queue waiting with for a Put . If concurrent take is also happening + * Concurrent Peek on Blocking Queue waiting with for a Put . If concurrent take is also happening * such that the object is removed first then the peek should block & not return with null. */ @Test - public void testBlockingQueueForConcurrentPeekAndTake() { - exceptionInThread = false; - testFailed = false; - try { - final TestBlockingHARegionQueue bQ = - new TestBlockingHARegionQueue("testBlockQueueForConcurrentPeekAndTake", cache); - Thread[] threads = new Thread[3]; - for (int i = 0; i < 3; i++) { - threads[i] = new Thread() { - public void run() { - try { - long startTime = System.currentTimeMillis(); - Object obj = bQ.peek(); - if (obj == null) { - testFailed = true; - message.append( - " Failed : failed since object was null and was not expected to be null \n"); - } - long totalTime = System.currentTimeMillis() - startTime; + public void testBlockingQueueForConcurrentPeekAndTake() throws Exception { + TestBlockingHARegionQueue regionQueue = + new TestBlockingHARegionQueue("testBlockQueueForConcurrentPeekAndTake", this.cache); + Thread[] threads = new Thread[3]; + + for (int i = 0; i < 3; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + long startTime = System.currentTimeMillis(); + Object obj = regionQueue.peek(); + if (obj == null) { + errorCollector.addError(new AssertionError( + "Failed : failed since object was null and was not expected to be null")); + } + long totalTime = System.currentTimeMillis() - startTime; - if (totalTime < 4000) { - testFailed = true; - message - .append(" Failed : Expected time to be greater than 4000 but it is not so "); - } - } catch (Exception e) { - exceptionInThread = true; - exception = e; + if (totalTime < 4000) { + errorCollector.addError(new AssertionError( + "Failed : Expected time to be greater than 4000 but it is not so")); } + } catch (Exception e) { + errorCollector.addError(e); } - }; - - } - - for (int k = 0; k < 3; k++) { - threads[k].start(); - } - Thread.sleep(4000); - - EventID id = new EventID(new byte[] {1}, 1, 1); - EventID id1 = new EventID(new byte[] {1}, 1, 2); + } + }; + } - bQ.takeFirst = true; - bQ.put(new ConflatableObject("key", "value", id, true, "testing")); + for (int k = 0; k < 3; k++) { + threads[k].start(); + } - Thread.sleep(2000); + Thread.sleep(4000); - bQ.put(new ConflatableObject("key1", "value1", id1, true, "testing")); + EventID id = new EventID(new byte[] {1}, 1, 1); + EventID id1 = new EventID(new byte[] {1}, 1, 2); - long startTime = System.currentTimeMillis(); - for (int k = 0; k < 3; k++) { - ThreadUtils.join(threads[k], 180 * 1000); - } + regionQueue.takeFirst = true; + regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName())); - long totalTime = System.currentTimeMillis() - startTime; + Thread.sleep(2000); - if (totalTime >= 180000) { - fail(" Test taken too long "); - } + regionQueue + .put(new ConflatableObject("key1", "value1", id1, true, this.testName.getMethodName())); - if (testFailed) { - fail(" test failed due to " + message); - } + long startTime = System.currentTimeMillis(); + for (int k = 0; k < 3; k++) { + ThreadUtils.join(threads[k], 180 * 1000); + } - } catch (Exception e) { - throw new AssertionError(" Test failed due to ", e); + long totalTime = System.currentTimeMillis() - startTime; + if (totalTime >= 180000) { + fail(" Test taken too long "); } } @@ -1373,71 +1123,60 @@ public class HARegionQueueJUnitTest { * QRM thread , the peek should block correctly. */ @Test - public void testBlockingQueueForTakeWhenPeekInProgress() { - exceptionInThread = false; - testFailed = false; - try { - final TestBlockingHARegionQueue bQ = - new TestBlockingHARegionQueue("testBlockQueueForTakeWhenPeekInProgress", cache); - Thread[] threads = new Thread[3]; - for (int i = 0; i < 3; i++) { - threads[i] = new Thread() { - public void run() { - try { - long startTime = System.currentTimeMillis(); - Object obj = bQ.peek(); - if (obj == null) { - testFailed = true; - message.append( - " Failed : failed since object was null and was not expected to be null \n"); - } - long totalTime = System.currentTimeMillis() - startTime; + public void testBlockingQueueForTakeWhenPeekInProgress() throws Exception { + TestBlockingHARegionQueue regionQueue = + new TestBlockingHARegionQueue("testBlockQueueForTakeWhenPeekInProgress", this.cache); + Thread[] threads = new Thread[3]; + + for (int i = 0; i < 3; i++) { + threads[i] = new Thread() { + @Override + public void run() { + try { + long startTime = System.currentTimeMillis(); + Object obj = regionQueue.peek(); + if (obj == null) { + errorCollector.addError(new AssertionError( + "Failed : failed since object was null and was not expected to be null")); + } + long totalTime = System.currentTimeMillis() - startTime; - if (totalTime < 4000) { - testFailed = true; - message - .append(" Failed : Expected time to be greater than 4000 but it is not so "); - } - } catch (Exception e) { - exceptionInThread = true; - exception = e; + if (totalTime < 4000) { + errorCollector.addError(new AssertionError( + "Failed : Expected time to be greater than 4000 but it is not so")); } + } catch (Exception e) { + errorCollector.addError(e); } - }; - } - - for (int k = 0; k < 3; k++) { - threads[k].start(); - } - Thread.sleep(4000); - - EventID id = new EventID(new byte[] {1}, 1, 1); - EventID id1 = new EventID(new byte[] {1}, 1, 2); + } + }; + } - bQ.takeWhenPeekInProgress = true; - bQ.put(new ConflatableObject("key", "value", id, true, "testing")); + for (int k = 0; k < 3; k++) { + threads[k].start(); + } - Thread.sleep(2000); + Thread.sleep(4000); - bQ.put(new ConflatableObject("key1", "value1", id1, true, "testing")); + EventID id = new EventID(new byte[] {1}, 1, 1); + EventID id1 = new EventID(new byte[] {1}, 1, 2); - long startTime = System.currentTimeMillis(); - for (int k = 0; k < 3; k++) { - ThreadUtils.join(threads[k], 60 * 1000); - } + regionQueue.takeWhenPeekInProgress = true; + regionQueue.put(new ConflatableObject("key", "value", id, true, this.testName.getMethodName())); - long totalTime = System.currentTimeMillis() - startTime; + Thread.sleep(2000); - if (totalTime >= 60000) { - fail(" Test taken too long "); - } + regionQueue + .put(new ConflatableObject("key1", "value1", id1, true, this.testName.getMethodName())); - if (testFailed) { - fail(" test failed due to " + message); - } + long startTime = System.currentTimeMillis(); + for (int k = 0; k < 3; k++) { + ThreadUtils.join(threads[k], 60 * 1000); + } - } catch (Exception e) { - throw new AssertionError(" Test failed due to ", e); + long totalTime = System.currentTimeMillis() - startTime; + if (totalTime >= 60000) { + fail(" Test taken too long "); } } @@ -1451,138 +1190,88 @@ public class HARegionQueueJUnitTest { * violation. This test will validate that behaviour */ @Test - public void testConcurrentEventExpiryAndTake() { - try { - HARegionQueueAttributes haa = new HARegionQueueAttributes(); - haa.setExpiryTime(3); - final RegionQueue regionqueue = - new HARegionQueue.TestOnlyHARegionQueue("testing", cache, haa) { - CacheListener createCacheListenerForHARegion() { + public void testConcurrentEventExpiryAndTake() throws Exception { + AtomicBoolean complete = new AtomicBoolean(false); + AtomicBoolean expiryCalled = new AtomicBoolean(false); + AtomicBoolean allowExpiryToProceed = new AtomicBoolean(false); - return new CacheListenerAdapter() { + HARegionQueueAttributes haa = new HARegionQueueAttributes(); + haa.setExpiryTime(3); - public void afterInvalidate(EntryEvent event) { + RegionQueue regionqueue = + new HARegionQueue.TestOnlyHARegionQueue(this.testName.getMethodName(), this.cache, haa) { + @Override + CacheListener createCacheListenerForHARegion() { - if (event.getKey() instanceof Long) { - synchronized (HARegionQueueJUnitTest.this) { - expiryCalled = true; - HARegionQueueJUnitTest.this.notify(); + return new CacheListenerAdapter() { - } ; - Thread.yield(); + @Override + public void afterInvalidate(EntryEvent event) { - synchronized (HARegionQueueJUnitTest.this) { - if (!allowExpiryToProceed) { - try { - HARegionQueueJUnitTest.this.wait(); - } catch (InterruptedException e1) { - encounteredException = true; - } + if (event.getKey() instanceof Long) { + synchronized (HARegionQueueJUnitTest.this) { + expiryCalled.set(true); + HARegionQueueJUnitTest.this.notifyAll(); + } + + Thread.yield(); + + synchronized (HARegionQueueJUnitTest.this) { + while (!allowExpiryToProceed.get()) { + try { + HARegionQueueJUnitTest.this.wait(); + } catch (InterruptedException e) { + errorCollector.addError(e); + break; } } - try { - expireTheEventOrThreadIdentifier(event); - } catch (CacheException e) { - e.printStackTrace(); - encounteredException = true; - } finally { - synchronized (HARegionQueueJUnitTest.this) { - complete = true; - HARegionQueueJUnitTest.this.notify(); - } + } + + try { + expireTheEventOrThreadIdentifier(event); + } catch (CacheException e) { + errorCollector.addError(e); + } finally { + synchronized (HARegionQueueJUnitTest.this) { + complete.set(true); + HARegionQueueJUnitTest.this.notifyAll(); } } } - }; - } - }; - EventID ev1 = new EventID(new byte[] {1}, 1, 1); + } + }; + } + }; - Conflatable cf1 = new ConflatableObject("key", "value", ev1, true, "testing"); + EventID ev1 = new EventID(new byte[] {1}, 1, 1); + Conflatable cf1 = + new ConflatableObject("key", "value", ev1, true, this.testName.getMethodName()); + regionqueue.put(cf1); - regionqueue.put(cf1); - synchronized (this) { - if (!expiryCalled) { - this.wait(); - } - } - try { - Object o = regionqueue.take(); - assertNull(o); - } catch (Exception e) { - throw new AssertionError("Test failed due to exception ", e); - } finally { - synchronized (this) { - this.allowExpiryToProceed = true; - this.notify(); - } + synchronized (this) { +
<TRUNCATED>