This is an automated email from the ASF dual-hosted git repository. udo pushed a commit to branch feature/GEODE-3926_3 in repository https://gitbox.apache.org/repos/asf/geode.git
commit bb245b35a74f11d8b7786043fba778c58b073201 Author: Udo <[email protected]> AuthorDate: Mon Mar 26 10:51:16 2018 -0700 GEODE-3926 Initial commit to add LuceneIndexCreationInProgressException --- .../sanctioned-geode-core-serializables.txt | 1 + .../cache/lucene/internal/InternalLuceneIndex.java | 2 + .../LuceneIndexCreationInProgressException.java | 9 ++ .../internal/LuceneIndexForPartitionedRegion.java | 14 +- .../cache/lucene/internal/LuceneRawIndex.java | 17 ++- .../lucene/internal/LuceneRawIndexFactory.java | 8 +- .../internal/PartitionedRepositoryManager.java | 35 ++++- .../lucene/internal/RawIndexRepositoryFactory.java | 23 +++- .../internal/RawLuceneRepositoryManager.java | 10 +- .../lucene/LuceneIndexCreationIntegrationTest.java | 8 +- .../LuceneIndexRecoveryHAIntegrationTest.java | 5 +- .../PartitionedRepositoryManagerJUnitTest.java | 150 ++++++++++++++------- .../RawLuceneRepositoryManagerJUnitTest.java | 20 +-- 13 files changed, 223 insertions(+), 79 deletions(-) diff --git a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt index d296f77..c2d1aa4 100644 --- a/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt +++ b/geode-core/src/main/resources/org/apache/geode/internal/sanctioned-geode-core-serializables.txt @@ -803,3 +803,4 @@ org/apache/geode/security/AuthenticationFailedException,true,-820286647227908887 org/apache/geode/security/AuthenticationRequiredException,true,4675976651103154919 org/apache/geode/security/GemFireSecurityException,true,3814254578203076926,cause:java/lang/Throwable org/apache/geode/security/NotAuthorizedException,true,419215768216387745,principal:java/security/Principal +org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException,false diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java index 74e4ac8..1e95a46 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/InternalLuceneIndex.java @@ -39,4 +39,6 @@ public interface InternalLuceneIndex extends LuceneIndex { void initialize(); + boolean isIndexAvailable(int id); + } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java new file mode 100644 index 0000000..7077246 --- /dev/null +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexCreationInProgressException.java @@ -0,0 +1,9 @@ +package org.apache.geode.cache.lucene.internal; + +import org.apache.geode.GemFireException; + +public class LuceneIndexCreationInProgressException extends GemFireException { + public LuceneIndexCreationInProgressException(String message) { + super(message); + } +} diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java index 577bdef..2f3e4ee 100644 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneIndexForPartitionedRegion.java @@ -16,6 +16,7 @@ package org.apache.geode.cache.lucene.internal; import java.util.Set; +import java.util.concurrent.ExecutorService; import org.apache.geode.CancelException; import org.apache.geode.cache.AttributesFactory; @@ -49,6 +50,8 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { public static final String FILES_REGION_SUFFIX = ".files"; + private ExecutorService waitingThreadPoolFromDM; + public LuceneIndexForPartitionedRegion(String indexName, String regionPath, InternalCache cache) { super(indexName, regionPath, cache); @@ -62,7 +65,7 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { mapper = new HeterogeneousLuceneSerializer(); } PartitionedRepositoryManager partitionedRepositoryManager = - new PartitionedRepositoryManager(this, mapper); + new PartitionedRepositoryManager(this, mapper, this.waitingThreadPoolFromDM); return partitionedRepositoryManager; } @@ -202,6 +205,15 @@ public class LuceneIndexForPartitionedRegion extends LuceneIndexImpl { } } + @Override + public boolean isIndexAvailable(int id) { + PartitionedRegion fileAndChunkRegion = getFileAndChunkRegion(); + if (fileAndChunkRegion != null) { + return fileAndChunkRegion.get(IndexRepositoryFactory.APACHE_GEODE_INDEX_COMPLETE, id) != null; + } + return false; + } + private void destroyOnRemoteMembers() { PartitionedRegion pr = (PartitionedRegion) getDataRegion(); DistributionManager dm = pr.getDistributionManager(); diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java index d4168bd..6ad03c7 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndex.java @@ -22,8 +22,12 @@ import org.apache.geode.internal.cache.PartitionedRegion; public class LuceneRawIndex extends LuceneIndexImpl { - protected LuceneRawIndex(String indexName, String regionPath, InternalCache cache) { + private String luceneFolderPath; + + protected LuceneRawIndex(String indexName, String regionPath, InternalCache cache, + String luceneFolderPath) { super(indexName, regionPath, cache); + this.luceneFolderPath = luceneFolderPath; } @Override @@ -33,7 +37,8 @@ public class LuceneRawIndex extends LuceneIndexImpl { mapper = new HeterogeneousLuceneSerializer(); } RawLuceneRepositoryManager rawLuceneRepositoryManager = - new RawLuceneRepositoryManager(this, mapper); + new RawLuceneRepositoryManager(this, mapper, + cache.getDistributionManager().getWaitingThreadPool(),luceneFolderPath); return rawLuceneRepositoryManager; } @@ -49,5 +54,11 @@ public class LuceneRawIndex extends LuceneIndexImpl { } @Override - public void destroy(boolean initiator) {} + public void destroy(boolean initiator) { + } + + @Override + public boolean isIndexAvailable(int id) { + return true; + } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java index 4a92049..0c921ef 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/LuceneRawIndexFactory.java @@ -17,8 +17,14 @@ package org.apache.geode.cache.lucene.internal; import org.apache.geode.internal.cache.InternalCache; public class LuceneRawIndexFactory extends LuceneIndexImplFactory { + private String luceneFolderPath; + + public LuceneRawIndexFactory(String luceneFolderPath) { + this.luceneFolderPath = luceneFolderPath; + } + @Override public LuceneIndexImpl create(String indexName, String regionPath, InternalCache cache) { - return new LuceneRawIndex(indexName, regionPath, cache); + return new LuceneRawIndex(indexName, regionPath, cache,luceneFolderPath); } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java index f60f83b..99e4f79 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManager.java @@ -20,6 +20,9 @@ import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; + +import org.apache.logging.log4j.Logger; import org.apache.geode.InternalGemFireError; import org.apache.geode.cache.Region; @@ -32,8 +35,10 @@ import org.apache.geode.internal.cache.BucketNotFoundException; import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; +import org.apache.geode.internal.logging.LogService; public class PartitionedRepositoryManager implements RepositoryManager { + private final Logger logger = LogService.getLogger(); public static IndexRepositoryFactory indexRepositoryFactory = new IndexRepositoryFactory(); /** * map of the parent bucket region to the index repository @@ -47,17 +52,23 @@ public class PartitionedRepositoryManager implements RepositoryManager { protected final ConcurrentHashMap<Integer, IndexRepository> indexRepositories = new ConcurrentHashMap<Integer, IndexRepository>(); - /** The user region for this index */ + /** + * The user region for this index + */ protected PartitionedRegion userRegion = null; protected final LuceneSerializer serializer; protected final InternalLuceneIndex index; protected volatile boolean closed; private final CountDownLatch isDataRegionReady = new CountDownLatch(1); - public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer) { + private final ExecutorService waitingThreadPoolFromDM; + + public PartitionedRepositoryManager(InternalLuceneIndex index, LuceneSerializer serializer, + ExecutorService waitingThreadPool) { this.index = index; this.serializer = serializer; this.closed = false; + this.waitingThreadPoolFromDM = waitingThreadPool; } public void setUserRegionForRepositoryManager(PartitionedRegion userRegion) { @@ -70,13 +81,28 @@ public class PartitionedRepositoryManager implements RepositoryManager { Region<Object, Object> region = ctx.getDataSet(); Set<Integer> buckets = ((InternalRegionFunctionContext) ctx).getLocalBucketSet(region); ArrayList<IndexRepository> repos = new ArrayList<IndexRepository>(buckets.size()); + for (Integer bucketId : buckets) { BucketRegion userBucket = userRegion.getDataStore().getLocalBucketById(bucketId); if (userBucket == null) { throw new BucketNotFoundException( "User bucket was not found for region " + region + "bucket id " + bucketId); } else { - repos.add(getRepository(userBucket.getId())); + if (index.isIndexAvailable(userBucket.getId())) { + repos.add(getRepository(userBucket.getId())); + } else { + waitingThreadPoolFromDM.execute(() -> { + try { + IndexRepository repository = getRepository(userBucket.getId()); + repos.add(repository); + } catch (BucketNotFoundException e) { + logger.debug( + "Lucene Index creation still in progress. Catching BucketNotFoundException"); + } + }); + throw new LuceneIndexCreationInProgressException( + "Lucene Index creation still in progress for bucket: " + userBucket.getId()); + } } } @@ -155,7 +181,8 @@ public class PartitionedRepositoryManager implements RepositoryManager { try { computeRepository(bucketId); } catch (LuceneIndexDestroyedException e) { - /* expected exception */} + /* expected exception */ + } } } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java index 984d3eb..0a7ad42 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawIndexRepositoryFactory.java @@ -30,12 +30,19 @@ import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.internal.cache.PartitionedRegion; public class RawIndexRepositoryFactory extends IndexRepositoryFactory { - public RawIndexRepositoryFactory() {} + + private String luceneFolderPath = ""; + + public RawIndexRepositoryFactory() { + } @Override public IndexRepository computeIndexRepository(final Integer bucketId, LuceneSerializer serializer, - InternalLuceneIndex index, PartitionedRegion userRegion, IndexRepository oldRepository, - PartitionedRepositoryManager partitionedRepositoryManager) throws IOException { + InternalLuceneIndex index, + PartitionedRegion userRegion, + IndexRepository oldRepository, + PartitionedRepositoryManager partitionedRepositoryManager) + throws IOException { final IndexRepository repo; if (oldRepository != null) { oldRepository.cleanup(); @@ -46,7 +53,11 @@ public class RawIndexRepositoryFactory extends IndexRepositoryFactory { if (indexForRaw.withPersistence()) { String bucketLocation = LuceneServiceImpl.getUniqueIndexName(index.getName(), index.getRegionPath() + "_" + bucketId); - File location = new File(index.getName(), bucketLocation); + String + filePath = + luceneFolderPath.isEmpty() ? index.getName() + : luceneFolderPath + File.separator + index.getName(); + File location = new File(filePath, bucketLocation); if (!location.exists()) { location.mkdirs(); } @@ -60,4 +71,8 @@ public class RawIndexRepositoryFactory extends IndexRepositoryFactory { return new IndexRepositoryImpl(null, writer, serializer, indexForRaw.getIndexStats(), dataBucket, null, "", indexForRaw); } + + public void setLuceneFolderPath(String luceneFolderPath) { + this.luceneFolderPath = luceneFolderPath; + } } diff --git a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java index 0b38c45..5480e74 100755 --- a/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java +++ b/geode-lucene/src/main/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManager.java @@ -15,6 +15,7 @@ package org.apache.geode.cache.lucene.internal; import java.io.IOException; +import java.util.concurrent.ExecutorService; import org.apache.geode.cache.lucene.LuceneSerializer; import org.apache.geode.cache.lucene.internal.repository.IndexRepository; @@ -22,10 +23,13 @@ import org.apache.geode.internal.cache.BucketNotFoundException; import org.apache.geode.internal.cache.PartitionedRegion; public class RawLuceneRepositoryManager extends PartitionedRepositoryManager { - public static IndexRepositoryFactory indexRepositoryFactory = new RawIndexRepositoryFactory(); + public static RawIndexRepositoryFactory indexRepositoryFactory = new RawIndexRepositoryFactory(); - public RawLuceneRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer) { - super(index, serializer); + public RawLuceneRepositoryManager(LuceneIndexImpl index, LuceneSerializer serializer, + ExecutorService waitingThreadPool, + String luceneFolderPath) { + super(index, serializer,waitingThreadPool); + indexRepositoryFactory.setLuceneFolderPath(luceneFolderPath); } @Override diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationIntegrationTest.java index 55c2222..2f061b0 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationIntegrationTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/LuceneIndexCreationIntegrationTest.java @@ -69,6 +69,7 @@ import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.test.junit.categories.IntegrationTest; import org.apache.geode.test.junit.categories.LuceneTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; /** * Tests of creating lucene indexes on regions. All tests of index creation use cases should be in @@ -85,6 +86,9 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest { @Rule public ExpectedException expectedException = ExpectedException.none(); + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + @Test public void shouldCreateIndexWriterWithAnalyzersWhenSettingPerFieldAnalyzers() @@ -162,14 +166,14 @@ public class LuceneIndexCreationIntegrationTest extends LuceneIntegrationTest { @Test public void shouldCreateRawIndexIfSpecifiedItsFactory() - throws BucketNotFoundException, InterruptedException { + throws BucketNotFoundException, InterruptedException, IOException { Map<String, Analyzer> analyzers = new HashMap<>(); final RecordingAnalyzer field1Analyzer = new RecordingAnalyzer(); final RecordingAnalyzer field2Analyzer = new RecordingAnalyzer(); analyzers.put("field1", field1Analyzer); analyzers.put("field2", field2Analyzer); - LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory(); + LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory(temporaryFolder.newFolder("lucene").getPath()); try { luceneService.createIndexFactory().setFields(analyzers).create(INDEX_NAME, REGION_NAME); Region region = createRegion(); diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java index 0f67cb6..a0753d1 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/LuceneIndexRecoveryHAIntegrationTest.java @@ -19,6 +19,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.Analyzer; @@ -91,7 +92,7 @@ public class LuceneIndexRecoveryHAIntegrationTest { userRegion.put("rebalance", "test"); service.waitUntilFlushed("index1", "userRegion", 30000, TimeUnit.MILLISECONDS); - RepositoryManager manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper); + RepositoryManager manager = new PartitionedRepositoryManager(index, mapper,Executors.newSingleThreadExecutor()); IndexRepository repo = manager.getRepository(userRegion, 0, null); assertNotNull(repo); @@ -106,7 +107,7 @@ public class LuceneIndexRecoveryHAIntegrationTest { userRegion = (PartitionedRegion) regionfactory.create("userRegion"); userRegion.put("rebalance", "test"); - manager = new PartitionedRepositoryManager((LuceneIndexImpl) index, mapper); + manager = new PartitionedRepositoryManager(index, mapper,Executors.newSingleThreadExecutor()); IndexRepository newRepo = manager.getRepository(userRegion, 0, null); Assert.assertNotEquals(newRepo, repo); diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java index b2c30b1..06ec0ce 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/PartitionedRepositoryManagerJUnitTest.java @@ -29,20 +29,24 @@ import java.io.IOException; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.index.IndexWriter; +import org.awaitility.Awaitility; import org.junit.After; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.mockito.Mockito; -import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -64,13 +68,13 @@ import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.LocalRegion; import org.apache.geode.internal.cache.PartitionRegionConfig; import org.apache.geode.internal.cache.PartitionedRegion; -import org.apache.geode.internal.cache.PartitionedRegion.RetryTimeKeeper; import org.apache.geode.internal.cache.PartitionedRegionDataStore; import org.apache.geode.internal.cache.PartitionedRegionHelper; import org.apache.geode.internal.cache.execute.InternalRegionFunctionContext; import org.apache.geode.test.fake.Fakes; import org.apache.geode.test.junit.categories.LuceneTest; import org.apache.geode.test.junit.categories.UnitTest; +import org.apache.geode.test.junit.rules.serializable.SerializableTemporaryFolder; @Category({UnitTest.class, LuceneTest.class}) @RunWith(PowerMockRunner.class) @@ -83,20 +87,24 @@ public class PartitionedRepositoryManagerJUnitTest { protected LuceneSerializer serializer; protected PartitionedRegionDataStore userDataStore; protected PartitionedRegionDataStore fileDataStore; - protected PartitionedRegionHelper prHelper; protected PartitionRegionConfig prConfig; protected LocalRegion prRoot; - protected Map<Integer, BucketRegion> fileAndChunkBuckets = new HashMap<Integer, BucketRegion>(); - protected Map<Integer, BucketRegion> dataBuckets = new HashMap<Integer, BucketRegion>(); + protected Map<Integer, BucketRegion> fileAndChunkBuckets = new HashMap<>(); + protected Map<Integer, BucketRegion> dataBuckets = new HashMap<>(); protected LuceneIndexStats indexStats; protected FileSystemStats fileSystemStats; protected LuceneIndexImpl indexForPR; protected PartitionedRepositoryManager repoManager; protected GemFireCacheImpl cache; + private final Map<Integer, Boolean> isIndexAvailableMap = new HashMap<>(); + + @Rule + public SerializableTemporaryFolder temporaryFolder = new SerializableTemporaryFolder(); + @Before - public void setUp() { + public void setUp() throws IOException { cache = Fakes.cache(); userRegion = Mockito.mock(PartitionedRegion.class); userDataStore = Mockito.mock(PartitionedRegionDataStore.class); @@ -117,7 +125,10 @@ public class PartitionedRepositoryManagerJUnitTest { DLockService.removeLockServiceForTests(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME); } - protected void createIndexAndRepoManager() { + protected void createIndexAndRepoManager() throws IOException { + String luceneFolderPath = temporaryFolder.newFolder("lucene").getPath(); + LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory(luceneFolderPath); + fileAndChunkRegion = Mockito.mock(PartitionedRegion.class); fileDataStore = Mockito.mock(PartitionedRegionDataStore.class); when(fileAndChunkRegion.getDataStore()).thenReturn(fileDataStore); @@ -142,13 +153,14 @@ public class PartitionedRepositoryManagerJUnitTest { when(prRoot.get("rid")).thenReturn(prConfig); PowerMockito.mockStatic(PartitionedRegionHelper.class); PowerMockito.when(PartitionedRegionHelper.getPRRoot(cache)).thenReturn(prRoot); - repoManager = new PartitionedRepositoryManager(indexForPR, serializer); + repoManager = new PartitionedRepositoryManager(indexForPR, serializer, + Executors.newSingleThreadExecutor()); repoManager.setUserRegionForRepositoryManager(userRegion); repoManager.allowRepositoryComputation(); } @Test - public void getByKey() throws BucketNotFoundException, IOException { + public void getByKey() throws BucketNotFoundException { setUpMockBucket(0); setUpMockBucket(1); @@ -165,8 +177,8 @@ public class PartitionedRepositoryManagerJUnitTest { assertEquals(repo0, repo113); assertNotEquals(repo0, repo1); - checkRepository(repo0, 0); - checkRepository(repo1, 1); + checkRepositoryContainsBucket(repo0, 0); + checkRepositoryContainsBucket(repo1, 1); } /** @@ -174,14 +186,14 @@ public class PartitionedRepositoryManagerJUnitTest { */ @Test public void destroyBucketShouldCreateNewIndexRepository() - throws BucketNotFoundException, IOException { + throws BucketNotFoundException { setUpMockBucket(0); IndexRepositoryImpl repo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); assertNotNull(repo0); - checkRepository(repo0, 0); + checkRepositoryContainsBucket(repo0, 0); BucketRegion fileBucket0 = fileAndChunkBuckets.get(0); BucketRegion dataBucket0 = dataBuckets.get(0); @@ -194,7 +206,7 @@ public class PartitionedRepositoryManagerJUnitTest { IndexRepositoryImpl newRepo0 = (IndexRepositoryImpl) repoManager.getRepository(userRegion, 0, null); assertNotEquals(repo0, newRepo0); - checkRepository(newRepo0, 0); + checkRepositoryContainsBucket(newRepo0, 0); assertTrue(repo0.isClosed()); assertFalse(repo0.getWriter().isOpen()); } @@ -213,45 +225,19 @@ public class PartitionedRepositoryManagerJUnitTest { when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(null); - when(fileAndChunkRegion.getOrCreateNodeForBucketWrite(eq(0), (RetryTimeKeeper) any())) - .then(new Answer() { - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(fileAndChunkBuckets.get(0)); - return null; - } + when(fileAndChunkRegion.getOrCreateNodeForBucketWrite(eq(0), any())) + .then((Answer) invocation -> { + when(fileDataStore.getLocalBucketById(eq(0))).thenReturn(fileAndChunkBuckets.get(0)); + return null; }); assertNotNull(repoManager.getRepository(userRegion, 0, null)); } - @Test - public void getByRegion() throws BucketNotFoundException { - setUpMockBucket(0); - setUpMockBucket(1); - - Set<Integer> buckets = new LinkedHashSet<Integer>(Arrays.asList(0, 1)); - InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class); - when(ctx.getLocalBucketSet((any()))).thenReturn(buckets); - Collection<IndexRepository> repos = repoManager.getRepositories(ctx); - assertEquals(2, repos.size()); - - Iterator<IndexRepository> itr = repos.iterator(); - IndexRepositoryImpl repo0 = (IndexRepositoryImpl) itr.next(); - IndexRepositoryImpl repo1 = (IndexRepositoryImpl) itr.next(); - - assertNotNull(repo0); - assertNotNull(repo1); - assertNotEquals(repo0, repo1); - - checkRepository(repo0, 0); - checkRepository(repo1, 1); - } - /** * Test that we get the expected exception when a user bucket is missing */ - @Test(expected = BucketNotFoundException.class) + @Test(expected = LuceneIndexCreationInProgressException.class) public void getMissingBucketByRegion() throws BucketNotFoundException { setUpMockBucket(0); @@ -262,15 +248,21 @@ public class PartitionedRepositoryManagerJUnitTest { repoManager.getRepositories(ctx); } - protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) { + protected void checkRepositoryContainsBucket(IndexRepositoryImpl repo0, int... bucketIds) { IndexWriter writer0 = repo0.getWriter(); RegionDirectory dir0 = (RegionDirectory) writer0.getDirectory(); - assertEquals(new BucketTargetingMap(fileAndChunkBuckets.get(bucketId), bucketId), - dir0.getFileSystem().getFileAndChunkRegion()); + boolean result = false; + for (int bucketId : bucketIds) { + BucketTargetingMap bucketTargetingMap = + new BucketTargetingMap(fileAndChunkBuckets.get(bucketId), bucketId); + result |= bucketTargetingMap.equals(dir0.getFileSystem().getFileAndChunkRegion()); + } + + assertTrue(result); assertEquals(serializer, repo0.getSerializer()); } - protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException { + protected void setUpMockBucket(int id) { BucketRegion mockBucket = Mockito.mock(BucketRegion.class); BucketRegion fileAndChunkBucket = Mockito.mock(BucketRegion.class); // Allowing the fileAndChunkBucket to behave like a map so that the IndexWriter operations don't @@ -290,6 +282,64 @@ public class PartitionedRepositoryManagerJUnitTest { BucketAdvisor mockBucketAdvisor = Mockito.mock(BucketAdvisor.class); when(fileAndChunkBucket.getBucketAdvisor()).thenReturn(mockBucketAdvisor); when(mockBucketAdvisor.isPrimary()).thenReturn(true); - return mockBucket; } + + @Test + public void queryByRegionWaitingForRepoToBeCreated() { + setUpMockBucket(0); + setUpMockBucket(1); + + setupIsIndexAvailable(); + + Set<Integer> buckets = new LinkedHashSet<>(Arrays.asList(0, 1)); + InternalRegionFunctionContext ctx = Mockito.mock(InternalRegionFunctionContext.class); + when(ctx.getLocalBucketSet((any()))).thenReturn(buckets); + final Collection<IndexRepository> repositories = new HashSet<>(); + + Awaitility.await().pollDelay(1, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS) + .atMost(500, TimeUnit.SECONDS).until(() -> { + try { + repositories.addAll(repoManager.getRepositories(ctx)); + } catch (BucketNotFoundException | LuceneIndexCreationInProgressException e) { + } + return repositories.size() == 2; + }); + + Iterator<IndexRepository> itr = repositories.iterator(); + IndexRepositoryImpl repo0 = (IndexRepositoryImpl) itr.next(); + IndexRepositoryImpl repo1 = (IndexRepositoryImpl) itr.next(); + + assertNotNull(repo0); + assertNotNull(repo1); + assertNotEquals(repo0, repo1); + + checkRepositoryContainsBucket(repo0, 0, 1); + checkRepositoryContainsBucket(repo1, 0, 1); + } + + private void setupIsIndexAvailable() { + when(indexForPR.isIndexAvailable(1)).then((Answer) invocation -> { + boolean result; + Boolean isAvailable = isIndexAvailableMap.get(1); + if (isAvailable == null || !isAvailable) { + isIndexAvailableMap.put(1, true); + result = false; + } else { + result = true; + } + return result; + }); + when(indexForPR.isIndexAvailable(0)).then((Answer) invocation -> { + boolean result; + Boolean isAvailable = isIndexAvailableMap.get(0); + if (isAvailable == null || !isAvailable) { + isIndexAvailableMap.put(0, true); + result = false; + } else { + result = true; + } + return result; + }); + } + } diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java index a000d2f..818b416 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/RawLuceneRepositoryManagerJUnitTest.java @@ -19,6 +19,9 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.util.concurrent.Executors; + import org.apache.lucene.analysis.standard.StandardAnalyzer; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.store.Directory; @@ -35,12 +38,11 @@ import org.apache.geode.internal.cache.BucketRegion; import org.apache.geode.internal.cache.PartitionedRegion; import org.apache.geode.internal.cache.PartitionedRegionDataStore; import org.apache.geode.test.fake.Fakes; -import org.apache.geode.test.junit.categories.LuceneTest; public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryManagerJUnitTest { @Before - public void setUp() { + public void setUp() throws IOException { cache = Fakes.cache(); userRegion = Mockito.mock(PartitionedRegion.class); @@ -53,11 +55,12 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa @After public void tearDown() { - ((RawLuceneRepositoryManager) repoManager).close(); + repoManager.close(); } - protected void createIndexAndRepoManager() { - LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory(); + protected void createIndexAndRepoManager() throws IOException { + String luceneFolderPath = temporaryFolder.newFolder("lucene").getPath(); + LuceneServiceImpl.luceneIndexFactory = new LuceneRawIndexFactory(luceneFolderPath); indexStats = Mockito.mock(LuceneIndexStats.class); indexForPR = Mockito.mock(LuceneRawIndex.class); @@ -66,7 +69,7 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa when(indexForPR.getCache()).thenReturn(cache); when(indexForPR.getRegionPath()).thenReturn("/testRegion"); when(indexForPR.withPersistence()).thenReturn(true); - repoManager = new RawLuceneRepositoryManager(indexForPR, serializer); + repoManager = new RawLuceneRepositoryManager(indexForPR, serializer,Executors.newSingleThreadExecutor(), luceneFolderPath); repoManager.setUserRegionForRepositoryManager(userRegion); repoManager.allowRepositoryComputation(); } @@ -78,14 +81,14 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa } @Override - protected void checkRepository(IndexRepositoryImpl repo0, int bucketId) { + protected void checkRepositoryContainsBucket(IndexRepositoryImpl repo0, int... bucketIds) { IndexWriter writer0 = repo0.getWriter(); Directory dir0 = writer0.getDirectory(); assertTrue(dir0 instanceof NIOFSDirectory); } @Override - protected BucketRegion setUpMockBucket(int id) throws BucketNotFoundException { + protected void setUpMockBucket(int id) { BucketRegion mockBucket = Mockito.mock(BucketRegion.class); when(mockBucket.getId()).thenReturn(id); when(userRegion.getBucketRegion(eq(id), eq(null))).thenReturn(mockBucket); @@ -95,7 +98,6 @@ public class RawLuceneRepositoryManagerJUnitTest extends PartitionedRepositoryMa dataBuckets.put(id, mockBucket); repoManager.computeRepository(mockBucket.getId()); - return mockBucket; } @Test -- To stop receiving notification emails like this one, please contact [email protected].
