GEODE-1351: add waitUntilFlush() into luceneIndexImpl This function is very useful in test and demo. It will wait for the AEQ size to be 0
Also added junit test and integration tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/34d37919 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/34d37919 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/34d37919 Branch: refs/heads/feature/GEODE-835-test Commit: 34d379196c3bf2129a391ce63a9890daa3d75b7b Parents: e1cbc9b Author: zhouxh <gz...@pivotal.io> Authored: Sun May 15 21:36:34 2016 -0700 Committer: zhouxh <gz...@pivotal.io> Committed: Mon May 16 11:42:05 2016 -0700 ---------------------------------------------------------------------- .../gemfire/cache/lucene/LuceneIndex.java | 5 ++ .../lucene/internal/LuceneEventListener.java | 11 +++ .../cache/lucene/internal/LuceneIndexImpl.java | 43 +++++++++-- .../internal/xml/LuceneIndexCreation.java | 4 + .../gemfire/cache/lucene/LuceneQueriesBase.java | 69 ++++++++++++++++- .../internal/LuceneIndexImplJUnitTest.java | 78 ++++++++++++++++++++ 6 files changed, 202 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/34d37919/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java index 743045b..be329f7 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/LuceneIndex.java @@ -56,4 +56,9 @@ public interface LuceneIndex { */ public Map<String, Analyzer> getFieldAnalyzers(); + /* + * wait until the current entries in cache are indexed + */ + public void waitUntilFlushed(int maxWait); + } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/34d37919/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java index 9fdfd43..2dae4ee 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneEventListener.java @@ -35,6 +35,8 @@ import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; import com.gemstone.gemfire.cache.lucene.internal.repository.IndexRepository; import com.gemstone.gemfire.cache.query.internal.DefaultQuery; import com.gemstone.gemfire.internal.cache.BucketNotFoundException; +import com.gemstone.gemfire.internal.cache.CacheObserverHolder; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy.TestHook; import com.gemstone.gemfire.internal.logging.LogService; /** @@ -70,6 +72,10 @@ public class LuceneEventListener implements AsyncEventListener { IndexRepository repository = repositoryManager.getRepository(region, key, callbackArgument); Operation op = event.getOperation(); + + if (testHook != null) { + testHook.doTestHook("FOUND_AND_BEFORE_PROCESSING_A_EVENT"); + } if (op.isCreate()) { repository.create(key, event.getDeserializedValue()); @@ -96,4 +102,9 @@ public class LuceneEventListener implements AsyncEventListener { DefaultQuery.setPdxReadSerialized(false); } } + + public interface TestHook { + public void doTestHook(String spot); + } + public static TestHook testHook; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/34d37919/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java index 0b5f8fa..981d9e4 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImpl.java @@ -21,23 +21,26 @@ package com.gemstone.gemfire.cache.lucene.internal; import java.util.Collections; import java.util.Map; +import java.util.concurrent.TimeUnit; -import com.gemstone.gemfire.InternalGemFireError; -import com.gemstone.gemfire.cache.Cache; -import com.gemstone.gemfire.cache.RegionAttributes; -import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; -import com.gemstone.gemfire.internal.cache.InternalRegionArguments; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import org.apache.logging.log4j.Logger; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.standard.StandardAnalyzer; +import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.asyncqueue.internal.AsyncEventQueueImpl; import com.gemstone.gemfire.cache.lucene.internal.filesystem.ChunkKey; import com.gemstone.gemfire.cache.lucene.internal.filesystem.File; import com.gemstone.gemfire.cache.lucene.internal.repository.RepositoryManager; import com.gemstone.gemfire.cache.lucene.internal.xml.LuceneIndexCreation; +import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; +import com.gemstone.gemfire.internal.cache.InternalRegionArguments; import com.gemstone.gemfire.internal.cache.PartitionedRegion; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.internal.logging.LogService; public abstract class LuceneIndexImpl implements InternalLuceneIndex { @@ -77,7 +80,33 @@ public abstract class LuceneIndexImpl implements InternalLuceneIndex { protected void setSearchableFields(String[] fields) { searchableFieldNames = fields; } - + + /* + * For test and demo purpose. To use it, the data region should stop feeding + * A more advanced version is under-development + */ + @Override + public void waitUntilFlushed(int maxWait) { + String aeqId = LuceneServiceImpl.getUniqueIndexName(indexName, regionPath); + AsyncEventQueue queue = (AsyncEventQueue)cache.getAsyncEventQueue(aeqId); + if (queue != null) { + long start = System.nanoTime(); + while (System.nanoTime() - start < TimeUnit.MILLISECONDS.toNanos(maxWait)) { + if (0 == queue.size()) { + logger.debug("waitUntilFlushed: Queue size is 0"); + break; + } else { + try { + Thread.sleep(200); + } catch (InterruptedException e) { + } + } + } + } else { + throw new IllegalArgumentException("The AEQ does not exist for the index "+indexName+" region "+regionPath); + } + } + @Override public String[] getFieldNames() { return searchableFieldNames; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/34d37919/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java index 86a10e4..b54f51b 100644 --- a/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java +++ b/geode-lucene/src/main/java/com/gemstone/gemfire/cache/lucene/internal/xml/LuceneIndexCreation.java @@ -114,4 +114,8 @@ public class LuceneIndexCreation implements LuceneIndex, Extension<Region<?, ?>> public void addFieldNames(String[] fieldNames) { this.fieldNames.addAll(Arrays.asList(fieldNames)); } + + @Override + public void waitUntilFlushed(int maxWait) { + } } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/34d37919/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java index c467a18..c7567f3 100644 --- a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/LuceneQueriesBase.java @@ -18,7 +18,7 @@ */ package com.gemstone.gemfire.cache.lucene; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.io.Serializable; import java.util.HashMap; @@ -27,6 +27,12 @@ import java.util.Map; import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.cache.Region; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.lucene.internal.LuceneEventListener; +import com.gemstone.gemfire.cache.lucene.internal.LuceneIndexImpl; +import com.gemstone.gemfire.cache.lucene.internal.LuceneServiceImpl; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientProxy; +import com.gemstone.gemfire.internal.logging.LogService; import com.gemstone.gemfire.test.dunit.Host; import com.gemstone.gemfire.test.dunit.SerializableRunnableIF; import com.gemstone.gemfire.test.dunit.VM; @@ -75,6 +81,67 @@ public abstract class LuceneQueriesBase extends JUnit4CacheTestCase { executeTextSearch(accessor); } + @Test + public void entriesFlushedToIndexAfterWaitForFlushCalled() { + SerializableRunnableIF createIndex = () -> { + LuceneService luceneService = LuceneServiceProvider.get(getCache()); + luceneService.createIndex(INDEX_NAME, REGION_NAME, "text"); + }; + dataStore1.invoke(() -> initDataStore(createIndex)); + dataStore2.invoke(() -> initDataStore(createIndex)); + accessor.invoke(() -> initAccessor(createIndex)); + + try { + dataStore1.invoke(() -> setTestHook()); + putDataInRegion(accessor); + waitForFlushBeforeExecuteTextSearch(accessor, 10); + executeTextSearch(accessor); + } finally { + dataStore1.invoke(() -> checkResultAndresetTestHook()); + } + } + + protected void waitForFlushBeforeExecuteTextSearch(VM vm, final int expectKeyNum) { + vm.invoke(() -> { + Cache cache = getCache(); + Region<Object, Object> region = cache.getRegion(REGION_NAME); + + LuceneService service = LuceneServiceProvider.get(cache); + LuceneIndexImpl index = (LuceneIndexImpl)service.getIndex(INDEX_NAME, REGION_NAME); + assertNotNull(index); + LuceneQuery<Integer, TestObject> query; + + String aeqId = LuceneServiceImpl.getUniqueIndexName(INDEX_NAME, REGION_NAME); + AsyncEventQueue queue = cache.getAsyncEventQueue(aeqId); + assertNotNull(queue); + assertTrue(queue.size()>0); + index.waitUntilFlushed(30000); + return null; + }); + } + + public static void setTestHook() { + LuceneEventListener.testHook = new LuceneEventListener.TestHook() { + + @Override + public void doTestHook(String spot) { + if (spot.equals("FOUND_AND_BEFORE_PROCESSING_A_EVENT")) { + try { + Thread.sleep(1000); + LogService.getLogger().debug("Waited in test hook"); + } + catch (InterruptedException e) { + } + } + } + }; + } + + public static void checkResultAndresetTestHook() + { + LuceneEventListener.testHook = null; + } + protected void executeTextSearch(VM vm) { vm.invoke(() -> { Cache cache = getCache(); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/34d37919/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java ---------------------------------------------------------------------- diff --git a/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java new file mode 100755 index 0000000..edecc66 --- /dev/null +++ b/geode-lucene/src/test/java/com/gemstone/gemfire/cache/lucene/internal/LuceneIndexImplJUnitTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.gemstone.gemfire.cache.lucene.internal; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import static org.mockito.Mockito.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import com.gemstone.gemfire.cache.Cache; +import com.gemstone.gemfire.cache.asyncqueue.AsyncEventQueue; +import com.gemstone.gemfire.cache.lucene.LuceneIndex; +import com.gemstone.gemfire.test.fake.Fakes; +import com.gemstone.gemfire.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class LuceneIndexImplJUnitTest { + public static final String REGION = "region"; + public static final String INDEX = "index"; + public static final int MAX_WAIT = 30000; + private Cache cache; + LuceneIndex index; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Before + public void createLuceneIndex() { + cache = Fakes.cache(); + index = new LuceneIndexForPartitionedRegion(INDEX, REGION, cache); + } + + @Test + public void waitUnitFlushedWithMissingAEQThrowsIllegalArgument() throws Exception { + thrown.expect(IllegalArgumentException.class); + index.waitUntilFlushed(MAX_WAIT); + } + + @Test + public void waitUnitFlushedWaitsForFlush() throws Exception { + final String expectedIndexName = LuceneServiceImpl.getUniqueIndexName(INDEX, REGION); + final AsyncEventQueue queue = mock(AsyncEventQueue.class); + when(cache.getAsyncEventQueue(eq(expectedIndexName))).thenReturn(queue); + + AtomicInteger callCount = new AtomicInteger(); + when(queue.size()).thenAnswer(invocation -> { + if (callCount.get() == 0) { + // when the waitUnitFlushed() called the 2nd time, queue.size() will return 0 + callCount.incrementAndGet(); + return 2; + } else { + // when the waitUnitFlushed() called the 2nd time, queue.size() will return 0 + return 0; + } + }); + index.waitUntilFlushed(MAX_WAIT); + verify(cache).getAsyncEventQueue(eq(expectedIndexName)); + } + +}