GEODE-1351: let the waitForFlush() to return false if timeout also apply it on some unit tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/3e8a610e Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/3e8a610e Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/3e8a610e Branch: refs/heads/feature/GEODE-835-test Commit: 3e8a610e99bc005bcc56c378e3d1e2274a3d468c Parents: 34d3791 Author: zhouxh <gz...@pivotal.io> Authored: Mon May 16 17:12:15 2016 -0700 Committer: zhouxh <gz...@pivotal.io> Committed: Mon May 16 17:12:15 2016 -0700 ---------------------------------------------------------------------- .../gemfire/cache/lucene/LuceneIndex.java | 4 +++- .../cache/lucene/internal/LuceneIndexImpl.java | 11 ++++----- .../internal/xml/LuceneIndexCreation.java | 3 ++- .../LuceneIndexCreationIntegrationTest.java | 25 ++++++++++---------- .../LuceneIndexRecoveryHAIntegrationTest.java | 14 +++++------ 5 files changed, 29 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java index be329f7..6b1a4b4 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java @@ -58,7 +58,9 @@ public interface LuceneIndex { /* * wait until the current entries in cache are indexed + * @param maxWaitInMilliseconds max wait time in millisecond + * @return if entries are flushed within maxWait */ - public void waitUntilFlushed(int maxWait); + public boolean waitUntilFlushed(int maxWaitInMillisecond); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java index 981d9e4..c165085 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java @@ -81,19 +81,17 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { searchableFieldNames = fields; } - /* - * For test and demo purpose. To use it, the data region should stop feeding - * A more advanced version is under-development - */ @Override - public void waitUntilFlushed(int maxWait) { + public boolean waitUntilFlushed(int maxWaitInMillisecond) { String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); AsyncEventQueue queue = (AsyncEventQueue)cache.getAsyncEventQueue(aeqId); + boolean flushed = false; if (queue != null) { long start = System.nanoTime(); - while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWait)) { + while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWaitInMillisecond)) { if (0 == queue.size()) { logger.debug("waitUntilFlushed: Queue size is 0"); + flushed = true; break; } else { try { @@ -105,6 +103,7 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { } else { throw new IllegalArgumentException("The AEQ does not exist for the index "+indexName+" region "+regionPath); } + return flushed; } @Override http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java index b54f51b..a3bdd24 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java @@ -116,6 +116,7 @@ public class LuceneIndexCreation implements LuceneIndex, Extension<Region<?, ?>> } @Override - public void waitUntilFlushed(int maxWait) { + public boolean waitUntilFlushed(int maxWaitInMillisecond) { + return true; } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java index 6429143..fe754a4 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneIndexCreationIntegrationTest.java @@ -44,7 +44,6 @@ import com.gemstone.gemfire.internal.cache.BucketNotFoundException; import com.gemstone.gemfire.internal.cache.LocalRegion; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.test.junit.categories.IntegrationTest; -import com.jayway.awaitility.Awaitility; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.core.KeywordTokenizer; @@ -71,12 +70,10 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest { Region region = createRegion(); final LuceneIndex index = luceneService.getIndex(INDEX_NAME, REGION_NAME); region.put("key1", new TestObject()); - + verifyIndexFinishFlushing(INDEX_NAME, REGION_NAME); assertEquals(analyzers, index.getFieldAnalyzers()); - Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { - assertEquals(Arrays.asList("field1"), field1Analyzer.analyzedfields); - assertEquals(Arrays.asList("field2"), field2Analyzer.analyzedfields); - }); + assertEquals(Arrays.asList("field1"), field1Analyzer.analyzedfields); + assertEquals(Arrays.asList("field2"), field2Analyzer.analyzedfields); } @Test @@ -188,26 +185,28 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest { cache.close(); createCache(); createIndex("field1", "field2"); + verifyIndexFinishFlushing(INDEX_NAME, REGION_NAME); dataRegion = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT) .create(REGION_NAME); LuceneQuery<Object, Object> query = luceneService.createLuceneQueryFactory() .create(INDEX_NAME, REGION_NAME, "field1:world"); - Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { - assertEquals(1, query.search().size()); - }); + assertEquals(1, query.search().size()); } + private void verifyIndexFinishFlushing(String indexName, String regionName) { + LuceneIndex index = luceneService.getIndex(indexName, regionName); + boolean flushed = index.waitUntilFlushed(60000); + assertTrue(flushed); + } + @Test public void shouldRecoverPersistentIndexWhenDataIsWrittenToIndex() throws ParseException, InterruptedException { createIndex("field1", "field2"); Region dataRegion = cache.createRegionFactory(RegionShortcut.PARTITION_PERSISTENT) .create(REGION_NAME); dataRegion.put("A", new TestObject()); - final AsyncEventQueueImpl queue = (AsyncEventQueueImpl) getIndexQueue(); - - //Wait until the queue has drained - Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> assertEquals(0, queue.size())); + verifyIndexFinishFlushing(INDEX_NAME, REGION_NAME); cache.close(); createCache(); createIndex("text"); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/3e8a610e/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java index 77d2a5c..d32e6d8 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java @@ -41,6 +41,7 @@ import com.gemstone.gemfire.cache.Region; import com.gemstone.gemfire.cache.RegionFactory; import com.gemstone.gemfire.cache.RegionShortcut; import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.lucene.LuceneIndex; import com.gemstone.gemfire.cache.lucene.LuceneQuery; import com.gemstone.gemfire.cache.lucene.LuceneQueryResults; import com.gemstone.gemfire.cache.lucene.LuceneService; @@ -55,7 +56,6 @@ import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.PartitionedRegion; import com.gemstone.gemfire.test.junit.categories.FlakyTest; import com.gemstone.gemfire.test.junit.categories.IntegrationTest; -import com.jayway.awaitility.Awaitility; @Category(IntegrationTest.class) public class LuceneIndexRecoveryHAIntegrationTest { @@ -139,7 +139,7 @@ public class LuceneIndexRecoveryHAIntegrationTest { value = new Type1("lucene world", 1, 2L, 3.0, 4.0f); userRegion.put("value3", value); - waitUntilQueueEmpty(aeqId); + verifyIndexFinishFlushing(INDEX, REGION); LuceneQuery<Integer, Type1> query = service.createLuceneQueryFactory().create(INDEX, REGION, "s:world"); LuceneQueryResults<Integer, Type1> results = query.search(); @@ -190,7 +190,7 @@ public class LuceneIndexRecoveryHAIntegrationTest { value = new Type1("lucene world", 1, 2L, 3.0, 4.0f); userRegion.put("value3", value); - waitUntilQueueEmpty(aeqId); + verifyIndexFinishFlushing(INDEX, REGION); PartitionedRegion fileRegion = (PartitionedRegion) cache.getRegion(aeqId + ".files"); assertNotNull(fileRegion); @@ -203,9 +203,9 @@ public class LuceneIndexRecoveryHAIntegrationTest { Assert.assertEquals(3, results.size()); } - private void waitUntilQueueEmpty(final String aeqId) { - // TODO flush queue - AsyncEventQueue queue = cache.getAsyncEventQueue(aeqId); - Awaitility.waitAtMost(1000, TimeUnit.MILLISECONDS).until(() -> assertEquals(0, queue.size())); + private void verifyIndexFinishFlushing(String indexName, String regionName) { + LuceneIndex index = LuceneServiceProvider.get(cache).getIndex(indexName, regionName); + boolean flushed = index.waitUntilFlushed(60000); + assertTrue(flushed); } }